Source code for nipyapi.canvas

# -*- coding: utf-8 -*-

"""
For interactions with the NiFi Canvas.
"""

from __future__ import absolute_import
import logging
import six
import nipyapi

__all__ = [
    "get_root_pg_id", "recurse_flow", "get_flow", "get_process_group_status",
    "get_process_group", "list_all_process_groups", "delete_process_group",
    "schedule_process_group", "create_process_group", "list_all_processors",
    "list_all_processor_types", "get_processor_type", 'create_processor',
    'delete_processor', 'get_processor', 'schedule_processor',
    'update_processor', 'get_variable_registry', 'update_variable_registry',
    'get_connections', 'purge_connection', 'purge_process_group',
    'get_bulletins', 'get_bulletin_board'
]

log = logging.getLogger(__name__)


[docs]def get_root_pg_id(): """ Convenience function to return the UUID of the Root Process Group Returns (str): The UUID of the root PG """ return nipyapi.nifi.FlowApi().get_process_group_status('root')\ .process_group_status.id
[docs]def recurse_flow(pg_id='root'): """ Returns information about a Process Group and all its Child Flows. Recurses the child flows by appending each process group with a 'nipyapi_extended' parameter which contains the child process groups, etc. Args: pg_id (str): The Process Group UUID Returns: (ProcessGroupFlowEntity): enriched NiFi Flow object """ assert isinstance(pg_id, six.string_types), "pg_id should be a string" def _walk_flow(node): """This recursively extends the ProcessGroupEntity to contain the ProcessGroupFlowEntity of each of it's child process groups. So you can have the entire canvas in a single object. """ if isinstance(node, nipyapi.nifi.ProcessGroupFlowEntity): for pg in node.process_group_flow.flow.process_groups: pg.__setattr__( 'nipyapi_extended', recurse_flow(pg.id) ) return node return _walk_flow(get_flow(pg_id))
[docs]def get_flow(pg_id='root'): """ Returns information about a Process Group and flow. This surfaces the native implementation, for the recursed implementation see 'recurse_flow' Args: pg_id (str): id of the Process Group to retrieve, defaults to the root process group if not set Returns: (ProcessGroupFlowEntity): The Process Group object """ assert isinstance(pg_id, six.string_types), "pg_id should be a string" try: return nipyapi.nifi.FlowApi().get_flow(pg_id) except nipyapi.nifi.rest.ApiException as err: raise ValueError(err.body)
[docs]def get_process_group_status(pg_id='root', detail='names'): """ Returns an entity containing the status of the Process Group. Optionally may be configured to return a simple dict of name:id pairings Note that there is also a 'process group status' command available, but it returns a subset of this data anyway, and this call is more useful Args: pg_id (str): The UUID of the Process Group detail (str): 'names' or 'all'; whether to return a simple dict of name:id pairings, or the full details. Defaults to 'names' Returns: (ProcessGroupEntity): The Process Group Entity including the status """ assert isinstance(pg_id, six.string_types), "pg_id should be a string" assert detail in ['names', 'all'], "detail should be either 'names' or " \ "'all'" raw = nipyapi.nifi.ProcessGroupsApi().get_process_group(id=pg_id) if detail == 'names': out = { raw.component.name: raw.component.id } return out return raw
[docs]def get_process_group(identifier, identifier_type='name'): """ Filters the list of all process groups against a given identifier string occuring in a given identifier_type field. Args: identifier (str): the string to filter the list for identifier_type (str): the field to filter on, set in config.py Returns: None for no matches, Single Object for unique match, list(Objects) for multiple matches """ assert isinstance(identifier, six.string_types) assert identifier_type in ['name', 'id'] try: if identifier_type == 'id': # assuming unique fetch of pg id # implementing separately to avoid recursing entire canvas out = nipyapi.nifi.ProcessGroupsApi().get_process_group(identifier) else: obj = list_all_process_groups() out = nipyapi.utils.filter_obj(obj, identifier, identifier_type) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body) return out
[docs]def list_all_process_groups(): """ Returns a flattened list of all Process Groups on the canvas. Potentially slow if you have a large canvas. Note that the ProcessGroupsApi().get_process_groups(pg_id) command only provides the first layer of pgs, whereas this trawls the entire canvas Returns: list[ProcessGroupEntity] """ def flatten(parent_pg): """ Recursively flattens the native datatypes into a generic list. Note that the root is a special case as it has no parent Args: parent_pg (ProcessGroupEntity): object to flatten Yields: Generator for all ProcessGroupEntities, eventually """ for child_pg in parent_pg.process_group_flow.flow.process_groups: for sub in flatten(child_pg.nipyapi_extended): yield sub yield child_pg root_flow = recurse_flow('root') out = list(flatten(root_flow)) # This duplicates the nipyapi_extended structure to the root case pga_handle = nipyapi.nifi.ProcessGroupsApi() root_entity = pga_handle.get_process_group('root') root_entity.__setattr__('nipyapi_extended', root_flow) out.append(root_entity) return out
[docs]def list_all_processors(): """ Returns a flat list of all Processors anywhere on the canvas Returns: list[ProcessorEntity] """ def flattener(): """ Memory efficient flattener, sort of. :return: yield's a ProcessEntity """ for pg in list_all_process_groups(): for proc in pg.nipyapi_extended.process_group_flow.flow.processors: yield proc return list(flattener())
[docs]def schedule_process_group(process_group_id, scheduled): """ Start or Stop a Process Group and all components. Note that this doesn't guarantee that all components have started, as some may be in Invalid states. Args: process_group_id (str): The UUID of the target Process Group scheduled (bool): True to start, False to stop Returns: (bool): True of successfully scheduled, False if not """ assert isinstance(process_group_id, six.string_types) assert isinstance(scheduled, bool) def _running_schedule_process_group(pg_id_): test_obj = nipyapi.nifi.ProcessGroupsApi().get_process_group(pg_id_) if test_obj.status.aggregate_snapshot.active_thread_count == 0: return True return False assert isinstance( get_process_group(process_group_id, 'id'), nipyapi.nifi.ProcessGroupEntity ) result = schedule_components( pg_id=process_group_id, scheduled=scheduled ) # If target scheduled state was successfully updated if result: # If we want to stop the processor if not scheduled: # Test that the processor threads have halted stop_test = nipyapi.utils.wait_to_complete( _running_schedule_process_group, process_group_id ) if stop_test: # Return True if we stopped the Process Group return result # Return False if we scheduled a stop, but it didn't stop return False # Return the True or False result if we were trying to start it return result
[docs]def delete_process_group(process_group, force=False, refresh=True): """ Deletes a given Process Group, with optional prejudice. Args: process_group (ProcessGroupEntity): The target Process Group force (bool): Stop, purge and clean the target Process Group before deletion. Experimental. refresh (bool): Whether to refresh the state first Returns: (ProcessGroupEntity: The updated object state """ assert isinstance(process_group, nipyapi.nifi.ProcessGroupEntity) assert isinstance(force, bool) assert isinstance(refresh, bool) if refresh or force: target = nipyapi.nifi.ProcessGroupsApi().get_process_group( process_group.id ) else: target = process_group if force: # Stop, drop, and roll. purge_process_group(target, stop=True) # Remove templates for template in nipyapi.templates.list_all_templates().templates: if target.id == template.template.group_id: nipyapi.templates.delete_template(template.id) try: return nipyapi.nifi.ProcessGroupsApi().remove_process_group( id=target.id, version=target.revision.version, client_id=target.revision.client_id ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
[docs]def create_process_group(parent_pg, new_pg_name, location): """ Creates a new Process Group with the given name under the provided parent Process Group at the given Location Args: parent_pg (ProcessGroupEntity): The parent Process Group to create the new process group in new_pg_name (str): The name of the new Process Group location (tuple[x, y]): the x,y coordinates to place the new Process Group under the parent Returns: (ProcessGroupEntity): The new Process Group """ assert isinstance(parent_pg, nipyapi.nifi.ProcessGroupEntity) assert isinstance(new_pg_name, six.string_types) assert isinstance(location, tuple) try: return nipyapi.nifi.ProcessGroupsApi().create_process_group( id=parent_pg.id, body=nipyapi.nifi.ProcessGroupEntity( revision=parent_pg.revision, component=nipyapi.nifi.ProcessGroupDTO( name=new_pg_name, position=nipyapi.nifi.PositionDTO( x=float(location[0]), y=float(location[1]) ) ) ) ) except nipyapi.nifi.rest.ApiException as e: raise e
[docs]def list_all_processor_types(): """ Produces the list of all available processor types in the NiFi instance Returns: list(ProcessorTypesEntity): A native datatype containing the processors list """ try: return nipyapi.nifi.FlowApi().get_processor_types() except nipyapi.nifi.rest.ApiException as e: raise e
[docs]def get_processor_type(identifier, identifier_type='name'): """ Gets the abstract object describing a Processor, or list thereof Args: identifier (str): the string to filter the list for identifier_type (str): the field to filter on, set in config.py Returns: None for no matches, Single Object for unique match, list(Objects) for multiple matches """ try: obj = list_all_processor_types().processor_types except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body) if obj: return nipyapi.utils.filter_obj(obj, identifier, identifier_type) return obj
[docs]def create_processor(parent_pg, processor, location, name=None, config=None): """ Instantiates a given processor on the canvas Args: parent_pg (ProcessGroupEntity): The parent Process Group processor (DocumentedTypeDTO): The abstract processor type object to be instantiated location (tuple[x, y]): The location coordinates name (Optional [str]): The name for the new Processor config (Optional [ProcessorConfigDTO]): A configuration object for the new processor Returns: (ProcessorEntity): The new Processor """ if name is None: processor_name = processor.type.split('.')[-1] else: processor_name = name if config is None: target_config = nipyapi.nifi.ProcessorConfigDTO() else: target_config = config try: return nipyapi.nifi.ProcessGroupsApi().create_processor( id=parent_pg.id, body=nipyapi.nifi.ProcessorEntity( revision={'version': 0}, component=nipyapi.nifi.ProcessorDTO( position=nipyapi.nifi.PositionDTO( x=float(location[0]), y=float(location[1]) ), type=processor.type, name=processor_name, config=target_config ) ) ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
[docs]def get_processor(identifier, identifier_type='name'): """ Filters the list of all Processors against the given identifier string in the given identifier_type field Args: identifier (str): The String to filter against identifier_type (str): The field to apply the filter to. Set in config.py Returns: None for no matches, Single Object for unique match, list(Objects) for multiple matches """ assert isinstance(identifier, six.string_types) assert identifier_type in ['name', 'id'] try: if identifier_type == 'id': out = nipyapi.nifi.ProcessorsApi().get_processor(identifier) else: obj = list_all_processors() out = nipyapi.utils.filter_obj(obj, identifier, identifier_type) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body) return out
[docs]def delete_processor(processor, refresh=True, force=False): """ Deletes a Processor from the canvas, with optional prejudice. Args: processor (ProcessorEntity): The processor to delete refresh (bool): Whether to refresh the Processor state before action force (bool): Whether to stop the Processor before deletion. Behavior may change in future releases. Experimental. Returns: (ProcessorEntity): The updated ProcessorEntity """ assert isinstance(processor, nipyapi.nifi.ProcessorEntity) assert isinstance(refresh, bool) assert isinstance(force, bool) if refresh or force: target = get_processor(processor.id, 'id') assert isinstance(target, nipyapi.nifi.ProcessorEntity) else: target = processor if force: if not schedule_processor(target, False): raise ("Could not prepare processor {0} for deletion" .format(target.id)) target = get_processor(processor.id, 'id') assert isinstance(target, nipyapi.nifi.ProcessorEntity) try: return nipyapi.nifi.ProcessorsApi().delete_processor( id=target.id, version=target.revision.version ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
def schedule_components(pg_id, scheduled, components=None): """ Changes the scheduled target state of a list of components within a given Process Group. Note that this does not guarantee that components will be Started or Stopped afterwards, merely that they will have their scheduling updated. Args: pg_id (str): The UUID of the parent Process Group scheduled (bool): True to start, False to stop components (list[ComponentType]): The list of Component Entities to schdule, e.g. ProcessorEntity's Returns: (bool): True for success, False for not """ assert isinstance( get_process_group(pg_id, 'id'), nipyapi.nifi.ProcessGroupEntity ) assert isinstance(scheduled, bool) assert components is None or isinstance(components, list) target_state = 'RUNNING' if scheduled else 'STOPPED' body = nipyapi.nifi.ScheduleComponentsEntity( id=pg_id, state=target_state ) if components: body.components = {i.id: i.revision for i in components} try: result = nipyapi.nifi.FlowApi().schedule_components( id=pg_id, body=body ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body) if result.state == target_state: return True return False
[docs]def schedule_processor(processor, scheduled, refresh=True): """ Set a Processor to Start or Stop. Note that this doesn't guarantee that it will change state, merely that it will be instructed to try. Some effort is made to wait and see if the processor starts Args: processor (ProcessorEntity): The Processor to target scheduled (bool): True to start, False to stop refresh (bool): Whether to refresh the object before action Returns: (bool): True for success, False for failure """ assert isinstance(processor, nipyapi.nifi.ProcessorEntity) assert isinstance(scheduled, bool) assert isinstance(refresh, bool) def _running_schedule_processor(processor_): test_obj = nipyapi.canvas.get_processor(processor_.id, 'id') if test_obj.status.aggregate_snapshot.active_thread_count == 0: return True log.info("Processor not stopped, active thread count %s", test_obj.status.aggregate_snapshot.active_thread_count) return False def _starting_schedule_processor(processor_): test_obj = nipyapi.canvas.get_processor(processor_.id, 'id') if test_obj.component.state == 'RUNNING': return True log.info("Processor not started, run_status %s", test_obj.component.state) return False assert isinstance(scheduled, bool) if refresh: target = nipyapi.canvas.get_processor(processor.id, 'id') assert isinstance(target, nipyapi.nifi.ProcessorEntity) else: target = processor result = schedule_components( pg_id=target.status.group_id, scheduled=scheduled, components=[target] ) # If target scheduled state was successfully updated if result: # If we want to stop the processor if not scheduled: # Test that the processor threads have halted stop_test = nipyapi.utils.wait_to_complete( _running_schedule_processor, target ) if stop_test: # Return True if we stopped the processor return result # Return False if we scheduled a stop, but it didn't stop return False # Test that the Processor started start_test = nipyapi.utils.wait_to_complete( _starting_schedule_processor, target ) if start_test: return result return False
[docs]def update_processor(processor, update): """ Updates configuration parameters for a given Processor. An example update would be: nifi.ProcessorConfigDTO(scheduling_period='3s') Args: processor (ProcessorEntity): The Processor to target for update update (ProcessorConfigDTO): The new configuration parameters Returns: (ProcessorEntity): The updated ProcessorEntity """ if not isinstance(update, nipyapi.nifi.ProcessorConfigDTO): raise ValueError( "update param is not an instance of nifi.ProcessorConfigDTO" ) try: return nipyapi.nifi.ProcessorsApi().update_processor( id=processor.id, body=nipyapi.nifi.ProcessorEntity( component=nipyapi.nifi.ProcessorDTO( config=update, id=processor.id ), revision=processor.revision, ) ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
[docs]def get_variable_registry(process_group, ancestors=True): """ Gets the contents of the variable registry attached to a Process Group Args: process_group (ProcessGroupEntity): The Process Group to retrieve the Variable Registry from ancestors (bool): Whether to include the Variable Registries from child Process Groups Returns: (VariableRegistryEntity): The Variable Registry """ try: return nipyapi.nifi.ProcessGroupsApi().get_variable_registry( process_group.id, include_ancestor_groups=ancestors ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
[docs]def update_variable_registry(process_group, update): """ Updates one or more key:value pairs in the variable registry Args: process_group (ProcessGroupEntity): The Process Group which has the Variable Registry to be updated update (tuple[key, value]): The variables to write to the registry Returns: (VariableRegistryEntity): The created or updated Variable Registry Entries """ if not isinstance(process_group, nipyapi.nifi.ProcessGroupEntity): raise ValueError( 'param process_group is not a valid nifi.ProcessGroupEntity' ) if not isinstance(update, list): raise ValueError( 'param update is not a valid list of (key,value) tuples' ) # Parse variable update into the datatype var_update = [ nipyapi.nifi.VariableEntity( nipyapi.nifi.VariableDTO( name=li[0], value=li[1], process_group_id=process_group.id ) ) for li in update ] try: return nipyapi.nifi.ProcessGroupsApi().update_variable_registry( id=process_group.id, body=nipyapi.nifi.VariableRegistryEntity( process_group_revision=process_group.revision, variable_registry=nipyapi.nifi.VariableRegistryDTO( process_group_id=process_group.id, variables=var_update ) ) ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
[docs]def get_connections(pg_id): """ EXPERIMENTAL List all child connections within a given Process Group Args: pg_id (str): The UUID of the target Process Group Returns: (ConnectionsEntity): A native datatype which contains the list of all Connections in the Process Group """ assert isinstance( get_process_group(pg_id, 'id'), nipyapi.nifi.ProcessGroupEntity ) try: out = nipyapi.nifi.ProcessGroupsApi().get_connections(pg_id) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body) assert isinstance(out, nipyapi.nifi.ConnectionsEntity) return out
[docs]def purge_connection(con_id): """ EXPERIMENTAL Drops all FlowFiles in a given connection. Waits until the action is complete before returning. Note that if upstream component isn't stopped, more data may flow into the connection after this action. Args: con_id (str): The UUID of the Connection to be purged Returns: (DropRequestEntity): The status reporting object for the drop request. """ # TODO: Reimplement to batched instead of single threaded def _autumn_leaves(con_id_, drop_request_): test_obj = nipyapi.nifi.FlowfileQueuesApi().get_drop_request( con_id_, drop_request_.drop_request.id ).drop_request if not test_obj.finished: return False if test_obj.failure_reason: raise ValueError( "Unable to complete drop request{0}, error was {1}" .format( test_obj, test_obj.drop_request.failure_reason ) ) return True try: drop_req = nipyapi.nifi.FlowfileQueuesApi().create_drop_request(con_id) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body) assert isinstance(drop_req, nipyapi.nifi.DropRequestEntity) return nipyapi.utils.wait_to_complete(_autumn_leaves, con_id, drop_req)
[docs]def purge_process_group(process_group, stop=False): """ EXPERIMENTAL Purges the connections in a given Process Group of FlowFiles, and optionally stops it first Args: process_group (ProcessGroupEntity): Target Process Group stop (Optional [bool]): Whether to stop the Process Group before action Returns: (list[dict{ID:True|False}]): Result set. A list of Dicts of Connection IDs mapped to True or False for success of each connection """ assert isinstance(process_group, nipyapi.nifi.ProcessGroupEntity) assert isinstance(stop, bool) if stop: if not schedule_process_group(process_group.id, False): raise ValueError( "Unable to stop Process Group {0} for purging" .format(process_group.id) ) cons = get_connections(process_group.id) result = [] for con in cons.connections: result.append({con.id: str(purge_connection(con.id))}) return result
[docs]def get_bulletins(): """ Retrieves current bulletins (alerts) from the Flow Canvas Returns: (ControllerBulletinsEntity): The native datatype containing a list of bulletins """ try: return nipyapi.nifi.FlowApi().get_bulletins() except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
[docs]def get_bulletin_board(): """ Retrieves the bulletin board object Returns: (BulletinBoardEntity): The native datatype BulletinBoard object """ try: return nipyapi.nifi.FlowApi().get_bulletin_board() except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)