Source code for nipyapi.canvas

# -*- coding: utf-8 -*-

"""
For interactions with the NiFi Canvas.
"""

from __future__ import absolute_import
import six
import nipyapi

__all__ = [
    "get_root_pg_id", "recurse_flow", "get_flow", "get_process_group_status",
    "get_process_group", "list_all_process_groups", "delete_process_group",
    "schedule_process_group", "create_process_group", "list_all_processors",
    "list_all_processor_types", "get_processor_type", 'create_processor',
    'delete_processor', 'get_processor', 'schedule_processor',
    'update_processor', 'get_variable_registry', 'update_variable_registry',
    'get_connections', 'purge_connection', 'purge_process_group',
    'get_bulletins', 'get_bulletin_board'
]


[docs]def get_root_pg_id(): """ Convenience function to return the UUID of the Root Process Group Returns (str): The UUID of the root PG """ return nipyapi.nifi.FlowApi().get_process_group_status('root')\ .process_group_status.id
[docs]def recurse_flow(pg_id='root'): """ Returns information about a Process Group and all its Child Flows. Recurses the child flows by appending each process group with a 'nipyapi_extended' parameter which contains the child process groups, etc. Args: pg_id (str): The Process Group UUID Returns: (ProcessGroupFlowEntity): enriched NiFi Flow object """ assert isinstance(pg_id, six.string_types), "pg_id should be a string" def _walk_flow(node): """This recursively extends the ProcessGroupEntity to contain the ProcessGroupFlowEntity of each of it's child process groups. So you can have the entire canvas in a single object. """ if isinstance(node, nipyapi.nifi.ProcessGroupFlowEntity): for pg in node.process_group_flow.flow.process_groups: pg.__setattr__( 'nipyapi_extended', recurse_flow(pg.id) ) return node return _walk_flow(get_flow(pg_id))
[docs]def get_flow(pg_id='root'): """ Returns information about a Process Group and flow. This surfaces the native implementation, for the recursed implementation see 'recurse_flow' Args: pg_id (str): id of the Process Group to retrieve, defaults to the root process group if not set Returns: (ProcessGroupFlowEntity): The Process Group object """ assert isinstance(pg_id, six.string_types), "pg_id should be a string" try: return nipyapi.nifi.FlowApi().get_flow(pg_id) except nipyapi.nifi.rest.ApiException as err: raise ValueError(err.body)
[docs]def get_process_group_status(pg_id='root', detail='names'): """ Returns an entity containing the status of the Process Group. Optionally may be configured to return a simple dict of name:id pairings Note that there is also a 'process group status' command available, but it returns a subset of this data anyway, and this call is more useful Args: pg_id (str): The UUID of the Process Group detail (str): 'names' or 'all'; whether to return a simple dict of name:id pairings, or the full details. Defaults to 'names' Returns: (ProcessGroupEntity): The Process Group Entity including the status """ assert isinstance(pg_id, six.string_types), "pg_id should be a string" assert detail in ['names', 'all'], "detail should be either 'names' or " \ "'all'" raw = nipyapi.nifi.ProcessGroupsApi().get_process_group(id=pg_id) if detail == 'names': out = { raw.component.name: raw.component.id } return out return raw
[docs]def get_process_group(identifier, identifier_type='name'): """ Filters the list of all process groups against a given identifier string occuring in a given identifier_type field. Args: identifier (str): the string to filter the list for identifier_type (str): the field to filter on, set in config.py Returns: None for no matches, Single Object for unique match, list(Objects) for multiple matches """ assert isinstance(identifier, six.string_types) assert identifier_type in ['name', 'id'] try: if identifier_type == 'id': # assuming unique fetch of pg id # implementing separately to avoid recursing entire canvas out = nipyapi.nifi.ProcessGroupsApi().get_process_group(identifier) else: obj = list_all_process_groups() out = nipyapi.utils.filter_obj(obj, identifier, identifier_type) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body) return out
[docs]def list_all_process_groups(): """ Returns a flattened list of all Process Groups on the canvas. Potentially slow if you have a large canvas. Note that the ProcessGroupsApi().get_process_groups(pg_id) command only provides the first layer of pgs, whereas this trawls the entire canvas Returns: list[ProcessGroupEntity] """ def flatten(parent_pg): """ Recursively flattens the native datatypes into a generic list. Note that the root is a special case as it has no parent Args: parent_pg (ProcessGroupEntity): object to flatten Yields: Generator for all ProcessGroupEntities, eventually """ for child_pg in parent_pg.process_group_flow.flow.process_groups: for sub in flatten(child_pg.nipyapi_extended): yield sub yield child_pg root_flow = recurse_flow('root') out = list(flatten(root_flow)) # This duplicates the nipyapi_extended structure to the root case pga_handle = nipyapi.nifi.ProcessGroupsApi() root_entity = pga_handle.get_process_group('root') root_entity.__setattr__('nipyapi_extended', root_flow) out.append(root_entity) return out
[docs]def list_all_processors(): """ Returns a flat list of all Processors anywhere on the canvas Returns: list[ProcessorEntity] """ def flattener(): """ Memory efficient flattener, sort of. :return: yield's a ProcessEntity """ for pg in list_all_process_groups(): for proc in pg.nipyapi_extended.process_group_flow.flow.processors: yield proc return list(flattener())
[docs]def schedule_process_group(process_group_id, scheduled): """ Start or Stop a Process Group and all components. Note that this doesn't guarantee that all components have started, as some may be in Invalid states. Args: process_group_id (str): The UUID of the target Process Group scheduled (bool): True to start, False to stop Returns: (bool): True of successfully scheduled, False if not """ assert isinstance(process_group_id, six.string_types) assert isinstance(scheduled, bool) def _running_schedule_process_group(pg_id_): test_obj = nipyapi.nifi.ProcessGroupsApi().get_process_group(pg_id_) if test_obj.status.aggregate_snapshot.active_thread_count == 0: return True return False assert isinstance( get_process_group(process_group_id, 'id'), nipyapi.nifi.ProcessGroupEntity ) result = schedule_components( pg_id=process_group_id, scheduled=scheduled ) # If target scheduled state was successfully updated if result: # If we want to stop the processor if not scheduled: # Test that the processor threads have halted stop_test = nipyapi.utils.wait_to_complete( _running_schedule_process_group, process_group_id ) if stop_test: # Return True if we stopped the Process Group return result # Return False if we scheduled a stop, but it didn't stop return False # Return the True or False result if we were trying to start it return result
[docs]def delete_process_group(process_group, force=False, refresh=True): """ Deletes a given Process Group, with optional prejudice. Args: process_group (ProcessGroupEntity): The target Process Group force (bool): Stop, purge and clean the target Process Group before deletion. Experimental. refresh (bool): Whether to refresh the state first Returns: (ProcessGroupEntity: The updated object state """ assert isinstance(process_group, nipyapi.nifi.ProcessGroupEntity) assert isinstance(force, bool) assert isinstance(refresh, bool) if refresh or force: target = nipyapi.nifi.ProcessGroupsApi().get_process_group( process_group.id ) else: target = process_group if force: # Stop, drop, and roll. purge_process_group(target, stop=True) # Remove templates for template in nipyapi.templates.list_all_templates().templates: if target.id == template.template.group_id: nipyapi.templates.delete_template(template.id) try: return nipyapi.nifi.ProcessGroupsApi().remove_process_group( id=target.id, version=target.revision.version, client_id=target.revision.client_id ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
[docs]def create_process_group(parent_pg, new_pg_name, location): """ Creates a new Process Group with the given name under the provided parent Process Group at the given Location Args: parent_pg (ProcessGroupEntity): The parent Process Group to create the new process group in new_pg_name (str): The name of the new Process Group location (tuple[x, y]): the x,y coordinates to place the new Process Group under the parent Returns: (ProcessGroupEntity): The new Process Group """ assert isinstance(parent_pg, nipyapi.nifi.ProcessGroupEntity) assert isinstance(new_pg_name, six.string_types) assert isinstance(location, tuple) try: return nipyapi.nifi.ProcessGroupsApi().create_process_group( id=parent_pg.id, body=nipyapi.nifi.ProcessGroupEntity( revision=parent_pg.revision, component=nipyapi.nifi.ProcessGroupDTO( name=new_pg_name, position=nipyapi.nifi.PositionDTO( x=float(location[0]), y=float(location[1]) ) ) ) ) except nipyapi.nifi.rest.ApiException as e: raise e
[docs]def list_all_processor_types(): """ Produces the list of all available processor types in the NiFi instance Returns: list(ProcessorTypesEntity): A native datatype containing the processors list """ try: return nipyapi.nifi.FlowApi().get_processor_types() except nipyapi.nifi.rest.ApiException as e: raise e
[docs]def get_processor_type(identifier, identifier_type='name'): """ Gets the abstract object describing a Processor, or list thereof Args: identifier (str): the string to filter the list for identifier_type (str): the field to filter on, set in config.py Returns: None for no matches, Single Object for unique match, list(Objects) for multiple matches """ try: obj = list_all_processor_types().processor_types except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body) if obj: return nipyapi.utils.filter_obj(obj, identifier, identifier_type) return obj
[docs]def create_processor(parent_pg, processor, location, name=None, config=None): """ Instantiates a given processor on the canvas Args: parent_pg (ProcessGroupEntity): The parent Process Group processor (DocumentedTypeDTO): The abstract processor type object to be instantiated location (tuple[x, y]): The location coordinates name (Optional [str]): The name for the new Processor config (Optional [ProcessorConfigDTO]): A configuration object for the new processor Returns: (ProcessorEntity): The new Processor """ if name is None: processor_name = processor.type.split('.')[-1] else: processor_name = name if config is None: target_config = nipyapi.nifi.ProcessorConfigDTO() else: target_config = config try: return nipyapi.nifi.ProcessGroupsApi().create_processor( id=parent_pg.id, body=nipyapi.nifi.ProcessorEntity( revision={'version': 0}, component=nipyapi.nifi.ProcessorDTO( position=nipyapi.nifi.PositionDTO( x=float(location[0]), y=float(location[1]) ), type=processor.type, name=processor_name, config=target_config ) ) ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
[docs]def get_processor(identifier, identifier_type='name'): """ Filters the list of all Processors against the given identifier string in the given identifier_type field Args: identifier (str): The String to filter against identifier_type (str): The field to apply the filter to. Set in config.py Returns: None for no matches, Single Object for unique match, list(Objects) for multiple matches """ assert isinstance(identifier, six.string_types) assert identifier_type in ['name', 'id'] try: if identifier_type == 'id': out = nipyapi.nifi.ProcessorsApi().get_processor(identifier) else: obj = list_all_processors() out = nipyapi.utils.filter_obj(obj, identifier, identifier_type) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body) return out
[docs]def delete_processor(processor, refresh=True, force=False): """ Deletes a Processor from the canvas, with optional prejudice. Args: processor (ProcessorEntity): The processor to delete refresh (bool): Whether to refresh the Processor state before action force (bool): Whether to stop the Processor before deletion. Behavior may change in future releases. Experimental. Returns: (ProcessorEntity): The updated ProcessorEntity """ assert isinstance(processor, nipyapi.nifi.ProcessorEntity) assert isinstance(refresh, bool) assert isinstance(force, bool) if refresh or force: target = get_processor(processor.id, 'id') assert isinstance(target, nipyapi.nifi.ProcessorEntity) else: target = processor if force: if not schedule_processor(target, False): raise ("Could not prepare processor {0} for deletion" .format(target.id)) target = get_processor(processor.id, 'id') assert isinstance(target, nipyapi.nifi.ProcessorEntity) try: return nipyapi.nifi.ProcessorsApi().delete_processor( id=target.id, version=target.revision.version ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
def schedule_components(pg_id, scheduled, components=None): """ Changes the scheduled target state of a list of components within a given Process Group. Note that this does not guarantee that components will be Started or Stopped afterwards, merely that they will have their scheduling updated. Args: pg_id (str): The UUID of the parent Process Group scheduled (bool): True to start, False to stop components (list[ComponentType]): The list of Component Entities to schdule, e.g. ProcessorEntity's Returns: (bool): True for success, False for not """ assert isinstance( get_process_group(pg_id, 'id'), nipyapi.nifi.ProcessGroupEntity ) assert isinstance(scheduled, bool) assert components is None or isinstance(components, list) target_state = 'RUNNING' if scheduled else 'STOPPED' body = nipyapi.nifi.ScheduleComponentsEntity( id=pg_id, state=target_state ) if components: body.components = {i.id: i.revision for i in components} try: result = nipyapi.nifi.FlowApi().schedule_components( id=pg_id, body=body ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body) if result.state == target_state: return True return False
[docs]def schedule_processor(processor, scheduled, refresh=True): """ Set a Processor to Start or Stop. Note that this doesn't guarantee that it will change state, merely that it will be instructed to try. Args: processor (ProcessorEntity): The Processor to target scheduled (bool): True to start, False to stop refresh (bool): Whether to refresh the object before action Returns: (bool): True for success, False for failure """ assert isinstance(processor, nipyapi.nifi.ProcessorEntity) assert isinstance(scheduled, bool) assert isinstance(refresh, bool) def _running_schedule_processor(processor_): test_obj = nipyapi.nifi.ProcessorsApi().get_processor(processor_.id) if test_obj.status.aggregate_snapshot.active_thread_count == 0: return True return False assert isinstance(scheduled, bool) if refresh: target = nipyapi.canvas.get_processor(processor.id, 'id') assert isinstance(target, nipyapi.nifi.ProcessorEntity) else: target = processor result = schedule_components( pg_id=target.status.group_id, scheduled=scheduled, components=[target] ) # If target scheduled state was successfully updated if result: # If we want to stop the processor if not scheduled: # Test that the processor threads have halted stop_test = nipyapi.utils.wait_to_complete( _running_schedule_processor, target ) if stop_test: # Return True if we stopped the processor return result # Return False if we scheduled a stop, but it didn't stop return False # Return the True or False result if we were trying to start the processor return result
[docs]def update_processor(processor, update): """ Updates configuration parameters for a given Processor. An example update would be: nifi.ProcessorConfigDTO(scheduling_period='3s') Args: processor (ProcessorEntity): The Processor to target for update update (ProcessorConfigDTO): The new configuration parameters Returns: (ProcessorEntity): The updated ProcessorEntity """ if not isinstance(update, nipyapi.nifi.ProcessorConfigDTO): raise ValueError( "update param is not an instance of nifi.ProcessorConfigDTO" ) try: return nipyapi.nifi.ProcessorsApi().update_processor( id=processor.id, body=nipyapi.nifi.ProcessorEntity( component=nipyapi.nifi.ProcessorDTO( config=update, id=processor.id ), revision=processor.revision, ) ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
[docs]def get_variable_registry(process_group, ancestors=True): """ Gets the contents of the variable registry attached to a Process Group Args: process_group (ProcessGroupEntity): The Process Group to retrieve the Variable Registry from ancestors (bool): Whether to include the Variable Registries from child Process Groups Returns: (VariableRegistryEntity): The Variable Registry """ try: return nipyapi.nifi.ProcessGroupsApi().get_variable_registry( process_group.id, include_ancestor_groups=ancestors ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
[docs]def update_variable_registry(process_group, update): """ Updates one or more key:value pairs in the variable registry Args: process_group (ProcessGroupEntity): The Process Group which has the Variable Registry to be updated update (tuple[key, value]): The variables to write to the registry Returns: (VariableRegistryEntity): The created or updated Variable Registry Entries """ if not isinstance(process_group, nipyapi.nifi.ProcessGroupEntity): raise ValueError( 'param process_group is not a valid nifi.ProcessGroupEntity' ) if not isinstance(update, list): raise ValueError( 'param update is not a valid list of (key,value) tuples' ) # Parse variable update into the datatype var_update = [ nipyapi.nifi.VariableEntity( nipyapi.nifi.VariableDTO( name=li[0], value=li[1], process_group_id=process_group.id ) ) for li in update ] try: return nipyapi.nifi.ProcessGroupsApi().update_variable_registry( id=process_group.id, body=nipyapi.nifi.VariableRegistryEntity( process_group_revision=process_group.revision, variable_registry=nipyapi.nifi.VariableRegistryDTO( process_group_id=process_group.id, variables=var_update ) ) ) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
[docs]def get_connections(pg_id): """ EXPERIMENTAL List all child connections within a given Process Group Args: pg_id (str): The UUID of the target Process Group Returns: (ConnectionsEntity): A native datatype which contains the list of all Connections in the Process Group """ assert isinstance( get_process_group(pg_id, 'id'), nipyapi.nifi.ProcessGroupEntity ) try: out = nipyapi.nifi.ProcessGroupsApi().get_connections(pg_id) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body) assert isinstance(out, nipyapi.nifi.ConnectionsEntity) return out
[docs]def purge_connection(con_id): """ EXPERIMENTAL Drops all FlowFiles in a given connection. Waits until the action is complete before returning. Note that if upstream component isn't stopped, more data may flow into the connection after this action. Args: con_id (str): The UUID of the Connection to be purged Returns: (DropRequestEntity): The status reporting object for the drop request. """ # TODO: Reimplement to batched instead of single threaded def _autumn_leaves(con_id_, drop_request_): test_obj = nipyapi.nifi.FlowfileQueuesApi().get_drop_request( con_id_, drop_request_.drop_request.id ).drop_request if not test_obj.finished: return False elif test_obj.finished: if test_obj.failure_reason: raise ValueError( "Unable to complete drop request{0}, error was {1}" .format( test_obj, test_obj.drop_request.failure_reason ) ) else: return True try: drop_req = nipyapi.nifi.FlowfileQueuesApi().create_drop_request(con_id) except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body) assert isinstance(drop_req, nipyapi.nifi.DropRequestEntity) return nipyapi.utils.wait_to_complete(_autumn_leaves, con_id, drop_req)
[docs]def purge_process_group(process_group, stop=False): """ EXPERIMENTAL Purges the connections in a given Process Group of FlowFiles, and optionally stops it first Args: process_group (ProcessGroupEntity): Target Process Group stop (Optional [bool]): Whether to stop the Process Group before action Returns: (list[dict{ID:True|False}]): Result set. A list of Dicts of Connection IDs mapped to True or False for success of each connection """ assert isinstance(process_group, nipyapi.nifi.ProcessGroupEntity) assert isinstance(stop, bool) if stop: if not schedule_process_group(process_group.id, False): raise ValueError( "Unable to stop Process Group {0} for purging" .format(process_group.id) ) cons = get_connections(process_group.id) result = [] for con in cons.connections: result.append({con.id: str(purge_connection(con.id))}) return result
[docs]def get_bulletins(): """ Retrieves current bulletins (alerts) from the Flow Canvas Returns: (ControllerBulletinsEntity): The native datatype containing a list of bulletins """ try: return nipyapi.nifi.FlowApi().get_bulletins() except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)
[docs]def get_bulletin_board(): """ Retrieves the bulletin board object Returns: (BulletinBoardEntity): The native datatype BulletinBoard object """ try: return nipyapi.nifi.FlowApi().get_bulletin_board() except nipyapi.nifi.rest.ApiException as e: raise ValueError(e.body)