Canvas
Canvas operations and flow management
For interactions with the NiFi Canvas.
- nipyapi.canvas.create_connection(source, target, relationships=None, name=None)[source]
Creates a connection between two objects for the given relationships
- Parameters:
source – Object to initiate the connection, e.g. ProcessorEntity
target – Object to terminate the connection, e.g. FunnelEntity
relationships (list) – list of strings of relationships to connect, may be collected from the object ‘relationships’ property (optional)
name (str) – Defaults to None, String of Name for Connection (optional)
- Returns:
for the created connection
- Return type:
- nipyapi.canvas.create_controller(parent_pg, controller, name=None)[source]
Creates a new Controller Service in a given Process Group of the given Controller type, with the given Name.
- Parameters:
parent_pg (
ProcessGroupEntity) – Target Parent PGcontroller (
DocumentedTypeDTO) – Type of Controller to create, found via the list_all_controller_types methodname (str, optional) – Name for the new Controller as a String
- Returns:
The created controller service
- Return type:
- nipyapi.canvas.create_funnel(pg_id, position=None)[source]
Creates a Funnel Object
- Parameters:
- Returns:
Created Funnel
- Return type:
- nipyapi.canvas.create_port(pg_id, port_type, name, state, position=None)[source]
Creates a new input or output port of given characteristics
- Parameters:
- Returns:
(PortEntity) of the created port
- nipyapi.canvas.create_process_group(parent_pg, new_pg_name, location, comment='')[source]
Creates a new Process Group with the given name under the provided parent Process Group at the given Location
- Parameters:
parent_pg (ProcessGroupEntity) – The parent Process Group to create the new process group in
new_pg_name (str) – The name of the new Process Group
location (tuple[x, y]) – the x,y coordinates to place the new Process Group under the parent
comment (str) – Entry for the Comments field
- Returns:
The new Process Group
- Return type:
- nipyapi.canvas.create_processor(parent_pg, processor, location, name=None, config=None)[source]
Instantiates a given processor on the canvas
- Parameters:
parent_pg (ProcessGroupEntity) – The parent Process Group
processor (DocumentedTypeDTO) – The abstract processor type object to be instantiated
location (tuple[x, y]) – The location coordinates
name (Optional [str]) – The name for the new Processor
config (Optional [ProcessorConfigDTO]) – A configuration object for the new processor
- Returns:
The new Processor
- Return type:
- nipyapi.canvas.create_remote_process_group(target_uris, transport='RAW', pg_id='root', position=None)[source]
Creates a new Remote Process Group with given parameters
- nipyapi.canvas.delete_connection(connection, purge=False)[source]
Deletes a connection, optionally purges it first
- Parameters:
connection (ConnectionEntity) – Connection to delete
purge (bool) – True to Purge, Defaults to False
- Returns:
the modified Connection
- Return type:
- nipyapi.canvas.delete_controller(controller, force=False)[source]
Delete a Controller service, with optional prejudice
- Parameters:
controller (ControllerServiceEntity) – Target Controller to delete
force (bool) – True to attempt Disable the Controller before deletion
- Returns:
(ControllerServiceEntity)
- nipyapi.canvas.delete_funnel(funnel, refresh=True)[source]
Deletes a Funnel Object
- Parameters:
funnel (FunnelEntity) – The Funnel to delete
refresh (bool) – Whether to refresh the object state before execution
- Returns:
Deleted FunnelEntity reference
- Return type:
- nipyapi.canvas.delete_process_group(process_group, force=False, refresh=True)[source]
Deletes a given Process Group, with optional prejudice.
- Parameters:
process_group (ProcessGroupEntity) – The target Process Group
force (bool) – Stop, purge and clean the target Process Group before deletion. Experimental.
refresh (bool) – Whether to refresh the state first
- Returns:
The updated object state
- Return type:
- nipyapi.canvas.delete_processor(processor, refresh=True, force=False)[source]
Deletes a Processor from the canvas, with optional prejudice.
- Parameters:
processor (ProcessorEntity) – The processor to delete
refresh (bool) – Whether to refresh the Processor state before action
force (bool) – Whether to stop, purge and remove connections to the Processor before deletion. Behavior may change in future releases.
- Returns:
The updated ProcessorEntity
- Return type:
- nipyapi.canvas.delete_remote_process_group(rpg, refresh=True)[source]
Deletes a given remote process group
- Parameters:
rpg (RemoteProcessGroupEntity) – Remote Process Group to remove
refresh (bool) – Whether to refresh the object before action
- Returns:
(RemoteProcessGroupEntity)
- nipyapi.canvas.get_bulletin_board()[source]
Retrieves the bulletin board object
- Returns:
The native datatype BulletinBoard object
- Return type:
- nipyapi.canvas.get_bulletins()[source]
Retrieves current bulletins (alerts) from the Flow Canvas
- Returns:
The native datatype containing a list
- Return type:
of bulletins
- nipyapi.canvas.get_component_connections(component)[source]
Returns list of Connections related to a given Component, e.g. Processor
- Parameters:
component – Component Object to filter by, e.g. a ProcessorEntity
- Returns:
List of ConnectionEntity Objects
- Return type:
(list)
- nipyapi.canvas.get_controller(identifier, identifier_type='name', bool_response=False, include_reporting_tasks=True)[source]
Retrieve a given Controller
- Parameters:
identifier (str) – ID or Name of a Controller to find
identifier_type (str) – ‘id’ or ‘name’, defaults to name
bool_response (bool) – If True, will return False if the Controller is not found - useful when testing for deletion completion
include_reporting_tasks (bool) – If True, will include Reporting Tasks in the search
Returns:
- nipyapi.canvas.get_flow(pg_id='root')[source]
Returns information about a Process Group and flow.
This surfaces the native implementation, for the recursed implementation see ‘recurse_flow’
- Parameters:
pg_id (str) – id of the Process Group to retrieve, defaults to the root process group if not set
- Returns:
The Process Group object
- Return type:
- nipyapi.canvas.get_pg_parents_ids(pg_id)[source]
Retrieve the ids of the parent Process Groups.
- Parameters:
pg_id (str) – Process group id
- Returns:
(list) List of ids of the input PG parents
- nipyapi.canvas.get_process_group(identifier, identifier_type='name', greedy=True)[source]
Filters the list of all process groups against a given identifier string occurring in a given identifier_type field.
- nipyapi.canvas.get_process_group_status(pg_id='root', detail='names')[source]
Returns an entity containing the status of the Process Group. Optionally may be configured to return a simple dict of name:id pairings
Note that there is also a ‘process group status’ command available, but it returns a subset of this data anyway, and this call is more useful
- Parameters:
- Returns:
The Process Group Entity including the status
- Return type:
- nipyapi.canvas.get_processor(identifier, identifier_type='name', greedy=True)[source]
Filters the list of all Processors against the given identifier string in the given identifier_type field
- Parameters:
- Returns:
None for no matches, Single Object for unique match, list(Objects) for multiple matches
- nipyapi.canvas.get_processor_type(identifier, identifier_type='name', greedy=True)[source]
Gets the abstract object describing a Processor, or list thereof
- nipyapi.canvas.get_remote_process_group(rpg_id, summary=False)[source]
Fetch a remote process group object, with optional summary of just ports
- nipyapi.canvas.get_root_pg_id()[source]
Convenience function to return the UUID of the Root Process Group
Returns (str): The UUID of the root PG
- nipyapi.canvas.get_variable_registry(process_group, ancestors=True)[source]
Gets the contents of the variable registry attached to a Process Group
- Parameters:
process_group (ProcessGroupEntity) – The Process Group to retrieve the Variable Registry from
ancestors (bool) – Whether to include the Variable Registries from child Process Groups
- Returns:
The Variable Registry
- Return type:
VariableRegistryEntity
- nipyapi.canvas.list_all_by_kind(kind, pg_id='root', descendants=True)[source]
Retrieves a list of all instances of a supported object type
- Parameters:
- Returns:
list of the Entity type of the kind, or single instance, or None
- nipyapi.canvas.list_all_connections(pg_id='root', descendants=True)[source]
Lists all connections for a given Process Group ID
- nipyapi.canvas.list_all_controller_types()[source]
Lists all Controller Service types available on the environment
- Returns:
list(DocumentedTypeDTO)
- nipyapi.canvas.list_all_controllers(pg_id='root', descendants=True, include_reporting_tasks=False)[source]
Lists all controllers under a given Process Group, defaults to Root. Optionally recurses all child Process Groups as well.
- nipyapi.canvas.list_all_funnels(pg_id='root', descendants=True)[source]
Convenience wrapper for list_all_by_kind for funnels
- nipyapi.canvas.list_all_input_ports(pg_id='root', descendants=True)[source]
Convenience wrapper for list_all_by_kind for input ports
- nipyapi.canvas.list_all_output_ports(pg_id='root', descendants=True)[source]
Convenience wrapper for list_all_by_kind for output ports
- nipyapi.canvas.list_all_process_groups(pg_id='root')[source]
Returns a flattened list of all Process Groups on the canvas. Potentially slow if you have a large canvas.
Note that the ProcessGroupsApi().get_process_groups(pg_id) command only provides the first layer of pgs, whereas this trawls the entire canvas
- Parameters:
pg_id (str) – The UUID of the Process Group to start from, defaults to the Canvas root
- Returns:
list[ProcessGroupEntity]
- nipyapi.canvas.list_all_processor_types()[source]
Produces the list of all available processor types in the NiFi instance
- Returns:
A native datatype containing the processors list
- Return type:
- nipyapi.canvas.list_all_processors(pg_id='root')[source]
Returns a flat list of all Processors under the provided Process Group
- Parameters:
pg_id (str) – The UUID of the Process Group to start from, defaults to the Canvas root
- Returns:
list[ProcessorEntity]
- nipyapi.canvas.list_all_remote_process_groups(pg_id='root', descendants=True)[source]
Convenience wrapper for list_all_by_kind for remote process groups
- nipyapi.canvas.list_invalid_processors(pg_id='root', summary=False)[source]
Returns a flattened list of all Processors with Invalid Statuses
- nipyapi.canvas.list_sensitive_processors(pg_id='root', summary=False)[source]
Returns a flattened list of all Processors on the canvas which have sensitive properties that would need to be managed during deployment
- nipyapi.canvas.purge_connection(con_id)[source]
EXPERIMENTAL Drops all FlowFiles in a given connection. Waits until the action is complete before returning.
Note that if upstream component isn’t stopped, more data may flow into the connection after this action.
- Parameters:
con_id (str) – The UUID of the Connection to be purged
- Returns:
The status reporting object for the drop request.
- Return type:
- nipyapi.canvas.purge_process_group(process_group, stop=False)[source]
EXPERIMENTAL Purges the connections in a given Process Group of FlowFiles, and optionally stops it first
- Parameters:
process_group (ProcessGroupEntity) – Target Process Group
stop (Optional [bool]) – Whether to stop the Process Group before action
- Returns:
True|False}]): Result set. A list of Dicts of
- Return type:
(list[dict{ID
Connection IDs mapped to True or False for success of each connection
- nipyapi.canvas.recurse_flow(pg_id='root')[source]
Returns information about a Process Group and all its Child Flows.
Recurses the child flows by appending each process group with a ‘nipyapi_extended’ parameter which contains the child process groups, etc.
Note: This previously used actual recursion which broke on large NiFi environments, we now use a task/list update approach.
- Parameters:
pg_id (str) – The Process Group UUID
- Returns:
enriched NiFi Flow object
- Return type:
- nipyapi.canvas.schedule_all_controllers(pg_id, scheduled)[source]
Enable or Disable all Controller Services in a Process Group.
Uses NiFi’s native bulk controller service activation API which handles all descendant controller services automatically.
- Parameters:
- Returns:
The result of the operation
- Return type:
- nipyapi.canvas.schedule_components(pg_id, scheduled, components=None)[source]
Changes the scheduled target state of a list of components within a given Process Group.
Note that this does not guarantee that components will be Started or Stopped afterwards, merely that they will have their scheduling updated.
- nipyapi.canvas.schedule_controller(controller, scheduled, refresh=False)[source]
Start/Enable or Stop/Disable a Controller Service
- Parameters:
controller (ControllerServiceEntity) – Target Controller to schedule
scheduled (bool) – True to start, False to stop
refresh (bool) – Whether to refresh the component revision before execution
- Returns:
(ControllerServiceEntity)
- nipyapi.canvas.schedule_process_group(process_group_id, scheduled)[source]
Start or Stop a Process Group and all components.
Note that this doesn’t guarantee that all components have started, as some may be in Invalid states.
- nipyapi.canvas.schedule_processor(processor, scheduled, refresh=True)[source]
Set a Processor to Start or Stop.
Note that this doesn’t guarantee that it will change state, merely that it will be instructed to try. Some effort is made to wait and see if the processor starts
- Parameters:
processor (ProcessorEntity) – The Processor to target
scheduled (bool) – True to start, False to stop
refresh (bool) – Whether to refresh the object before action
- Returns:
True for success, False for failure
- Return type:
(bool)
- nipyapi.canvas.set_remote_process_group_transmission(rpg, enable=True, refresh=True)[source]
Enable or Disable Transmission for an RPG
- Parameters:
rpg (RemoteProcessGroupEntity) – The ID of the remote process group to modify
enable (bool) – True to enable, False to disable
refresh (bool) – Whether to refresh the object before action
Returns:
- nipyapi.canvas.update_controller(controller, update)[source]
Updates the Configuration of a Controller Service
- Parameters:
controller (ControllerServiceEntity) – Target Controller to update
update (ControllerServiceDTO) – Controller Service configuration object containing the new config params and properties
- Returns:
(ControllerServiceEntity)
- nipyapi.canvas.update_process_group(pg, update, refresh=True)[source]
Updates a given Process Group.
- Parameters:
pg (ProcessGroupEntity) – The Process Group to target for update
update (dict) – key:value pairs to update
refresh (bool) – Whether to refresh the Process Group before applying the update
- Returns:
The updated ProcessorEntity
- Return type:
- nipyapi.canvas.update_processor(processor, update=None, name=None, refresh=True, auto_stop=False)[source]
Updates a Processor’s configuration and/or name.
- For configuration changes, pass a ProcessorConfigDTO:
nifi.ProcessorConfigDTO(scheduling_period=’3s’)
For renaming, pass the new name. Both can be provided together.
Processors must be stopped for certain updates (including renaming). If auto_stop is True (default), the processor will be stopped before updating and restarted afterward if it was originally running.
- Parameters:
processor (ProcessorEntity) – The Processor to target for update
update (ProcessorConfigDTO, optional) – Configuration parameters to update
name (str, optional) – New name for the processor
refresh (bool) – Whether to refresh the Processor object state before applying the update. Default True.
auto_stop (bool) – If True, automatically stop the processor before updating and restart afterward if it was running. Default False.
- Returns:
The updated ProcessorEntity
- Return type:
- Raises:
ValueError – If neither update nor name is provided, or if update is not a ProcessorConfigDTO, or if processor is running and auto_stop=False.
- nipyapi.canvas.update_variable_registry(process_group, update, refresh=True)[source]
Updates one or more key:value pairs in the variable registry
- Parameters:
process_group (
ProcessGroupEntity) – The Process Group which has the Variable Registry to be updatedupdate (list[tuple]) – The variables to write to the registry
refresh (bool) – Whether to refresh the object revision before updating
- Returns:
- The created or updated Variable
Registry Entries
- Return type:
VariableRegistryEntity