Canvas
Canvas operations and flow management
For interactions with the NiFi Canvas.
- class nipyapi.canvas.FlowSubgraph(components, connections)[source]
Bases:
tupleCreate new instance of FlowSubgraph(components, connections)
- nipyapi.canvas.clear_controller_state(controller, greedy=True, identifier_type='auto')[source]
Clear all state for a controller service.
This removes all state entries from the controller service. Use with caution as this may affect the controller’s behavior (e.g., clearing a CDC table state service will cause tables to be re-snapshotted).
Note: The controller must be DISABLED before clearing state. Attempting to clear state on an enabled controller will raise an error.
- Parameters:
- Returns:
The cleared state entity (should have 0 entries)
- Return type:
- Raises:
TypeError – If controller is not a string or ControllerServiceEntity.
ValueError – Controller not found, multiple matches found, or controller is enabled.
ApiException – NiFi API errors
Example:
# Disable controller first nipyapi.canvas.schedule_controller(my_controller, scheduled=False) # Clear all state nipyapi.canvas.clear_controller_state(my_controller) # Verify cleared state = nipyapi.canvas.get_controller_state(my_controller) assert state.component_state.local_state.total_entry_count == 0
- nipyapi.canvas.clear_processor_state(processor, greedy=True, identifier_type='auto')[source]
Clear all state for a processor.
This removes all state entries from the processor. Use with caution as this may affect the processor’s behavior (e.g., clearing ListFile state will cause all files to be re-listed).
- Parameters:
- Returns:
The cleared state entity (should have 0 entries)
- Return type:
- Raises:
TypeError – If processor is not a string or ProcessorEntity.
ValueError – Processor not found or multiple matches found.
ApiException – NiFi API errors
Example:
# Clear all state nipyapi.canvas.clear_processor_state(my_processor) # Verify cleared state = nipyapi.canvas.get_processor_state(my_processor) assert state.component_state.local_state.total_entry_count == 0
- nipyapi.canvas.create_connection(source, target, relationships=None, name=None, bends=None)[source]
Creates a connection between two objects for the given relationships
- Parameters:
source – Object to initiate the connection, e.g. ProcessorEntity
target – Object to terminate the connection, e.g. FunnelEntity
relationships (list) – list of strings of relationships to connect, may be collected from the object ‘relationships’ property (optional)
name (str) – Defaults to None, String of Name for Connection (optional)
bends (list) – List of PositionDTO or (x, y) tuples for connection bends. For self-loop connections (source == target), bends are auto-calculated if not provided, to ensure the loop renders correctly in the UI.
- Returns:
for the created connection
- Return type:
- nipyapi.canvas.create_controller(parent_pg, controller, name=None)[source]
Creates a new Controller Service in a given Process Group of the given Controller type, with the given Name.
- Parameters:
parent_pg (
ProcessGroupEntity) – Target Parent PGcontroller (
DocumentedTypeDTO) – Type of Controller to create, found via the list_all_controller_types methodname (str, optional) – Name for the new Controller as a String. If not provided, defaults to the short type name (e.g., “JsonTreeReader”).
- Returns:
The created controller service
- Return type:
- nipyapi.canvas.create_funnel(pg_id, position=None)[source]
Creates a Funnel Object
- Parameters:
- Returns:
Created Funnel
- Return type:
- nipyapi.canvas.create_port(pg_id, port_type, name, state, position=None)[source]
Creates a new input or output port of given characteristics
- Parameters:
- Returns:
(PortEntity) of the created port
- nipyapi.canvas.create_process_group(parent_pg, new_pg_name, location=None, comment='', greedy=True, identifier_type='auto')[source]
Creates a new Process Group with the given name under the provided parent Process Group at the given Location
- Parameters:
parent_pg (str or ProcessGroupEntity) – The parent Process Group, as a ProcessGroupEntity object, process group ID, or name. Use “root” for the root canvas.
new_pg_name (str) – The name of the new Process Group
location (tuple[x, y], optional) – the x,y coordinates to place the new Process Group. If None, uses layout.suggest_pg_position() to find a non-overlapping position.
comment (str) – Entry for the Comments field
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
The new Process Group
- Return type:
- Raises:
TypeError – If parent_pg is not a string or ProcessGroupEntity.
ValueError – If parent process group not found or multiple matches found.
- nipyapi.canvas.create_processor(parent_pg, processor, location, name=None, config=None, greedy=True, identifier_type='auto')[source]
Instantiates a given processor on the canvas
- Parameters:
parent_pg (str or ProcessGroupEntity) – The parent Process Group, as a ProcessGroupEntity object, process group ID, or name.
processor (DocumentedTypeDTO) – The abstract processor type object to be instantiated
name (Optional [str]) – The name for the new Processor
config (Optional [ProcessorConfigDTO]) – A configuration object for the new processor
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
The new Processor
- Return type:
- Raises:
TypeError – If parent_pg is not a string or ProcessGroupEntity.
ValueError – If parent process group not found or multiple matches found.
- nipyapi.canvas.create_remote_process_group(target_uris, transport='RAW', pg_id='root', position=None)[source]
Creates a new Remote Process Group with given parameters
- nipyapi.canvas.delete_connection(connection, purge=False)[source]
Deletes a connection, optionally purges it first
- Parameters:
connection (ConnectionEntity) – Connection to delete
purge (bool) – True to Purge, Defaults to False
- Returns:
the modified Connection
- Return type:
- nipyapi.canvas.delete_controller(controller, force=False, refresh=True, greedy=True, identifier_type='auto')[source]
Delete a Controller service, with optional prejudice
- Parameters:
controller (ControllerServiceEntity or str) – Target Controller to delete, as a ControllerServiceEntity object, controller ID, or controller name.
force (bool) – True to attempt Disable the Controller before deletion
refresh (bool) – Whether to refresh the controller to get latest revision before deletion. Defaults to True to avoid stale revision errors.
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
(ControllerServiceEntity)
- Raises:
TypeError – If controller is not a string or ControllerServiceEntity.
ValueError – If controller not found or multiple matches found.
- nipyapi.canvas.delete_funnel(funnel, refresh=True)[source]
Deletes a Funnel Object
- Parameters:
funnel (FunnelEntity) – The Funnel to delete
refresh (bool) – Whether to refresh the object state before execution
- Returns:
Deleted FunnelEntity reference
- Return type:
- nipyapi.canvas.delete_process_group(process_group, force=False, refresh=True, greedy=True, identifier_type='auto')[source]
Deletes a given Process Group, with optional prejudice.
- Parameters:
process_group (ProcessGroupEntity or str) – The target Process Group, as a ProcessGroupEntity object, process group ID, or name.
force (bool) – Stop, purge and clean the target Process Group before deletion. Experimental.
refresh (bool) – Whether to refresh the state first
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
The updated object state
- Return type:
- Raises:
TypeError – If process_group is not a string or ProcessGroupEntity.
ValueError – If process group not found or multiple matches found.
- nipyapi.canvas.delete_processor(processor, refresh=True, force=False)[source]
Deletes a Processor from the canvas, with optional prejudice.
- Parameters:
processor (ProcessorEntity) – The processor to delete
refresh (bool) – Whether to refresh the Processor state before action
force (bool) – Whether to stop, purge and remove connections to the Processor before deletion. Behavior may change in future releases.
- Returns:
The updated ProcessorEntity
- Return type:
- nipyapi.canvas.delete_remote_process_group(rpg, refresh=True, force=False)[source]
Deletes a given remote process group.
- Parameters:
rpg (RemoteProcessGroupEntity) – Remote Process Group to remove
refresh (bool) – Whether to refresh the object before action
force (bool) – If True, stop transmission before deleting. Use this when the RPG may be transmitting and you want to ensure deletion.
- Returns:
(RemoteProcessGroupEntity)
- nipyapi.canvas.get_bulletin_board(pg_id=None, source_name=None, message=None, limit=None)[source]
Retrieves bulletins from the bulletin board with optional filtering.
This is an alias for
nipyapi.bulletins.get_bulletin_board().- Parameters:
pg_id (str, optional) – Filter to bulletins from this process group ID. If None, returns bulletins from all groups.
source_name (str, optional) – Filter by source component name (regex pattern).
message (str, optional) – Filter by message content (regex pattern).
limit (int, optional) – Maximum number of bulletins to return.
- Returns:
- List of bulletin objects matching the filters.
Returns empty list if no bulletins match.
- Return type:
- nipyapi.canvas.get_bulletins()[source]
Retrieves current bulletins (alerts) from the Flow Canvas.
This is an alias for
nipyapi.bulletins.get_bulletins().- Returns:
- The native datatype containing a list
of bulletins
- Return type:
- nipyapi.canvas.get_component_connections(component)[source]
Returns list of Connections related to a given Component, e.g. Processor
- Parameters:
component – Component Object to filter by, e.g. a ProcessorEntity
- Returns:
List of ConnectionEntity Objects
- Return type:
(list)
- nipyapi.canvas.get_connection(connection)[source]
Get a connection by ID or refresh a ConnectionEntity.
- Parameters:
connection – Either a connection UUID (str) or a ConnectionEntity object. If a ConnectionEntity is provided, fetches a fresh copy (useful for getting the latest revision before updates).
- Returns:
The requested connection
- Return type:
Example:
# Fetch by ID conn = nipyapi.canvas.get_connection("abc-123-uuid") # Refresh an existing entity fresh_conn = nipyapi.canvas.get_connection(conn)
- nipyapi.canvas.get_controller(identifier, identifier_type='name', bool_response=False, include_reporting_tasks=True, greedy=True)[source]
Retrieve a given Controller Service
- Parameters:
identifier (str) – ID or Name of a Controller to find
identifier_type (str) – ‘id’ or ‘name’, defaults to name
bool_response (bool) – If True, will return False if the Controller is not found - useful when testing for deletion completion
include_reporting_tasks (bool) – If True, will include Reporting Tasks in the search
greedy (bool) – Whether to exact match (False) or partial match (True)
- Returns:
ControllerServiceEntity or None/False depending on bool_response
- nipyapi.canvas.get_controller_service_docs(controller)[source]
Get detailed documentation for a controller service type.
This function retrieves the full ControllerServiceDefinition from NiFi, which contains comprehensive documentation useful for understanding controller service capabilities and configuration options.
- Parameters:
controller (ControllerServiceEntity or DocumentedTypeDTO or str) – An existing controller service, a type from get_controller_type(), or a type name string (e.g., “JsonTreeReader” or full qualified name).
- Returns:
- Controller
documentation including property_descriptors, tags, and more.
None: If controller type not found.
- Return type:
Example:
# From existing controller service cs = nipyapi.canvas.get_controller("MyJsonReader") docs = nipyapi.canvas.get_controller_service_docs(cs) print(docs.tags) print(docs.property_descriptors.keys()) # From controller type cs_type = nipyapi.canvas.get_controller_type("JsonTreeReader") docs = nipyapi.canvas.get_controller_service_docs(cs_type) # From type name string docs = nipyapi.canvas.get_controller_service_docs("AvroReader")
- nipyapi.canvas.get_controller_state(controller, greedy=True, identifier_type='auto')[source]
Get the state for a controller service.
Controller services can maintain internal state (e.g., cache entries, connection tracking, CDC table status). This function retrieves that state.
- Parameters:
- Returns:
ComponentStateEntity with component_state containing component_id, state_description, local_state (single node), and cluster_state (distributed). Check whichever state map has entries. Each state map has scope, total_entry_count, and state (list of StateEntryDTO).
- Raises:
TypeError – If controller is not a string or ControllerServiceEntity.
ValueError – Controller not found or multiple matches found.
ApiException – NiFi API errors
Example:
state = nipyapi.canvas.get_controller_state(my_controller) state_map = state.component_state.local_state if state_map and state_map.state: for entry in state_map.state: print(f"{entry.key}: {entry.value}")
- nipyapi.canvas.get_controller_type(identifier, identifier_type='name', greedy=True)[source]
Gets the abstract object describing a controller, or list thereof
- nipyapi.canvas.get_flow(pg_id='root')[source]
Returns information about a Process Group and flow.
This surfaces the native implementation, for the recursed implementation see ‘recurse_flow’
- Parameters:
pg_id (str) – id of the Process Group to retrieve, defaults to the root process group if not set
- Returns:
The Process Group object
- Return type:
- nipyapi.canvas.get_flow_components(start_component, pg_id=None)[source]
Find all components and connections in a connected flow subgraph.
Performs a breadth-first traversal of the connection graph to find the complete connected subgraph (the ‘flow’). Useful for selecting an entire flow to move or analyze as a unit. The algorithm fetches all components in one API call, builds an adjacency map, then performs BFS from the start component to find all reachable nodes.
- Parameters:
start_component – Any component entity (processor, funnel, port) to start the traversal from
pg_id – Process group ID containing the flow. If None, inferred from start_component.component.parent_group_id
- Returns:
FlowSubgraph named tuple with ‘components’ (list of component entities) and ‘connections’ (list of ConnectionEntity objects within the flow).
Example:
# Get the complete flow subgraph flow = nipyapi.canvas.get_flow_components(proc1) # Access components for c in flow.components: print(c.component.name) # Access connections (useful for transpose_flow) nipyapi.layout.transpose_flow( flow.components, offset=(400, 0), connections=flow.connections )
- nipyapi.canvas.get_flowfile_content(connection, flowfile_uuid, decode='auto', output_file=None, cluster_node_id=None)[source]
Download the content of a specific FlowFile.
This is a non-destructive operation - the FlowFile remains in the queue.
- Parameters:
connection (str or ConnectionEntity) – Connection ID or ConnectionEntity.
flowfile_uuid (str) – UUID of the FlowFile.
decode (str) – How to decode content: “auto” (mime-based), “text” (UTF-8), or “bytes” (raw).
output_file (None or bool or str) – None returns content directly, True saves to current dir with FlowFile’s filename, str saves to that path/directory.
cluster_node_id (str or None) – Node ID for clustered NiFi. If not provided, will be auto-resolved from queue listing (adds one API call).
- Returns:
bytes or str (depending on decode) If output_file is set: str path where file was saved
- Return type:
If output_file is None
Example:
# Get content as auto-detected type content = nipyapi.canvas.get_flowfile_content(connection_id, flowfile_uuid) # Force text decoding text = nipyapi.canvas.get_flowfile_content(conn, uuid, decode='text') # Save to file using FlowFile's filename path = nipyapi.canvas.get_flowfile_content(conn, uuid, output_file=True) print(f"Saved to: {path}") # Save to specific directory path = nipyapi.canvas.get_flowfile_content(conn, uuid, output_file='/tmp/') # In clustered NiFi, pass the cluster_node_id from listing flowfiles = nipyapi.canvas.list_flowfiles(connection_id) if flowfiles: content = nipyapi.canvas.get_flowfile_content( connection_id, flowfiles[0].uuid, cluster_node_id=flowfiles[0].cluster_node_id )
- nipyapi.canvas.get_flowfile_details(connection, flowfile_uuid, cluster_node_id=None)[source]
Get full details for a specific FlowFile, including its attributes.
This is a non-destructive operation - the FlowFile remains in the queue.
- Parameters:
connection – Connection ID (str) or ConnectionEntity
flowfile_uuid – UUID of the FlowFile to retrieve
cluster_node_id – Node ID for clustered NiFi. If not provided, will be auto-resolved from queue listing (adds one API call).
- Returns:
- FlowFile details including
attributes dict, filename, size, queued_duration, etc.
- Return type:
Example:
# Get FlowFile details details = nipyapi.canvas.get_flowfile_details(connection_id, flowfile_uuid) print(f"Filename: {details.filename}") print(f"Size: {details.size} bytes") print(f"Attributes: {details.attributes}") # In clustered NiFi, pass the cluster_node_id from listing flowfiles = nipyapi.canvas.list_flowfiles(connection_id) if flowfiles: details = nipyapi.canvas.get_flowfile_details( connection_id, flowfiles[0].uuid, cluster_node_id=flowfiles[0].cluster_node_id )
- nipyapi.canvas.get_pg_parents_ids(pg_id)[source]
Retrieve the ids of the parent Process Groups.
- Parameters:
pg_id (str) – Process group id
- Returns:
(list) List of ids of the input PG parents
- nipyapi.canvas.get_port(identifier, identifier_type='id')[source]
Get an Input or Output Port by identifier.
Tries input ports first, then output ports. For ID lookups, uses direct API calls. For name lookups, searches all ports in the canvas.
- Parameters:
- Returns:
The matching port. None: If no port found (name lookup only).
- Return type:
- Raises:
ValueError – If port not found (id lookup) or invalid identifier_type.
Example:
# Get by ID port = nipyapi.canvas.get_port("abc-123-def", "id") # Get by name port = nipyapi.canvas.get_port("MyInputPort", "name")
- nipyapi.canvas.get_process_group(identifier, identifier_type='name', greedy=True)[source]
Filters the list of all process groups against a given identifier string occurring in a given identifier_type field.
- nipyapi.canvas.get_process_group_status(pg_id='root', detail='names')[source]
Returns an entity containing the status of the Process Group. Optionally may be configured to return a simple dict of name:id pairings
Note that there is also a ‘process group status’ command available, but it returns a subset of this data anyway, and this call is more useful
- Parameters:
- Returns:
The Process Group Entity including the status
- Return type:
- nipyapi.canvas.get_processor(identifier, identifier_type='name', greedy=True)[source]
Filters the list of all Processors against the given identifier string in the given identifier_type field
- Parameters:
- Returns:
None for no matches, Single Object for unique match, list(Objects) for multiple matches
- nipyapi.canvas.get_processor_docs(processor)[source]
Get detailed documentation for a processor, including properties, use cases, and tags.
This function retrieves the full ProcessorDefinition from NiFi, which contains comprehensive documentation useful for understanding processor capabilities and configuration options.
- Parameters:
processor (ProcessorEntity or DocumentedTypeDTO or str) – An existing processor, a processor type from get_processor_type(), or a type name string (e.g., “GenerateFlowFile” or full qualified name).
- Returns:
- Processor documentation
including property_descriptors, tags, supported_relationships, multi_processor_use_cases, dynamic_properties, and more.
None: If processor type not found.
- Return type:
Example:
# From existing processor proc = nipyapi.canvas.get_processor("MyProcessor") docs = nipyapi.canvas.get_processor_docs(proc) print(docs.tags) # ['record', 'update', 'json', ...] print(docs.property_descriptors.keys()) # From processor type proc_type = nipyapi.canvas.get_processor_type("UpdateRecord") docs = nipyapi.canvas.get_processor_docs(proc_type) # From type name string docs = nipyapi.canvas.get_processor_docs("GenerateFlowFile")
- nipyapi.canvas.get_processor_state(processor, greedy=True, identifier_type='auto')[source]
Get the state for a processor.
Processors can maintain internal state (e.g., ListFile tracks listed files, TailFile tracks file positions). This function retrieves that state.
- Parameters:
- Returns:
ComponentStateEntity with component_state containing component_id, state_description, local_state (single node), and cluster_state (distributed). Check whichever state map has entries. Each state map has scope, total_entry_count, and state (list of StateEntryDTO).
- Raises:
TypeError – If processor is not a string or ProcessorEntity.
ValueError – Processor not found or multiple matches found.
ApiException – NiFi API errors
Example:
state = nipyapi.canvas.get_processor_state(my_list_file_processor) state_map = state.component_state.local_state if state_map and state_map.state: for entry in state_map.state: print(f"{entry.key}: {entry.value}")
- nipyapi.canvas.get_processor_type(identifier, identifier_type='name', greedy=True)[source]
Gets the abstract object describing a Processor, or list thereof
- nipyapi.canvas.get_remote_process_group(rpg_id, summary=False)[source]
Fetch a remote process group object, with optional summary of just ports
- nipyapi.canvas.get_root_pg_id()[source]
Convenience function to return the UUID of the Root Process Group
Returns (str): The UUID of the root PG
- nipyapi.canvas.list_all_by_kind(kind, pg_id='root', descendants=True, greedy=True, identifier_type='auto')[source]
Retrieves a list of all instances of a supported object type
- Parameters:
kind (str) – one of input_ports, output_ports, funnels, controllers, connections, remote_process_groups
pg_id – The Process Group to list from, as a UUID string, process group name, or ProcessGroupEntity object. Defaults to root.
descendants (bool) – optional, whether to collect child group info
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
list of the Entity type of the kind, or single instance, or None
- nipyapi.canvas.list_all_connections(pg_id='root', descendants=True)[source]
Lists all connections for a given Process Group ID
- nipyapi.canvas.list_all_controller_types()[source]
Lists all Controller Service types available on the environment
- Returns:
list(DocumentedTypeDTO)
- nipyapi.canvas.list_all_controllers(pg_id='root', descendants=True, include_reporting_tasks=False, greedy=True, identifier_type='auto')[source]
Lists all controllers under a given Process Group, defaults to Root. Optionally recurses all child Process Groups as well.
- Parameters:
pg_id (str) – The Process Group to list from, as a UUID string, process group name, or ProcessGroupEntity object. Defaults to root.
descendants (bool) – True to recurse child PGs, False to not
include_reporting_tasks (bool) – True to include Reporting Tasks, False to not
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
None, ControllerServiceEntity, or list(ControllerServiceEntity)
- nipyapi.canvas.list_all_funnels(pg_id='root', descendants=True)[source]
Convenience wrapper for list_all_by_kind for funnels
- nipyapi.canvas.list_all_input_ports(pg_id='root', descendants=True)[source]
Convenience wrapper for list_all_by_kind for input ports
- nipyapi.canvas.list_all_output_ports(pg_id='root', descendants=True)[source]
Convenience wrapper for list_all_by_kind for output ports
- nipyapi.canvas.list_all_process_groups(pg_id='root')[source]
Returns a flattened list of all Process Groups on the canvas. Potentially slow if you have a large canvas.
Note that the ProcessGroupsApi().get_process_groups(pg_id) command only provides the first layer of pgs, whereas this trawls the entire canvas
- Parameters:
pg_id (str) – The UUID of the Process Group to start from, defaults to the Canvas root
- Returns:
list[ProcessGroupEntity]
- nipyapi.canvas.list_all_processor_types()[source]
Produces the list of all available processor types in the NiFi instance
- Returns:
A native datatype containing the processors list
- Return type:
- nipyapi.canvas.list_all_processors(pg_id='root')[source]
Returns a flat list of all Processors under the provided Process Group
- Parameters:
pg_id (str) – The UUID of the Process Group to start from, defaults to the Canvas root
- Returns:
list[ProcessorEntity]
- nipyapi.canvas.list_all_remote_process_groups(pg_id='root', descendants=True)[source]
Convenience wrapper for list_all_by_kind for remote process groups
- nipyapi.canvas.list_flowfiles(connection, limit=100)[source]
List FlowFiles waiting in a connection’s queue.
This is a non-destructive operation - FlowFiles remain in the queue. Returns basic metadata for each FlowFile; use get_flowfile() to retrieve full details including attributes.
- Parameters:
connection – Connection ID (str) or ConnectionEntity
limit – Maximum number of FlowFiles to return (default 100)
- Returns:
- List of FlowFile
summaries with uuid, filename, size, queued_duration, etc. Returns empty list if queue is empty.
- Return type:
list[
FlowFileSummaryDTO]
Example:
# List FlowFiles in a connection conn = nipyapi.canvas.get_connection(connection_id) flowfiles = nipyapi.canvas.list_flowfiles(conn) for ff in flowfiles: print(f"{ff.uuid}: {ff.filename} ({ff.size} bytes, queued {ff.queued_duration}ms)") # Get first FlowFile's full details if flowfiles: details = nipyapi.canvas.get_flowfile(conn, flowfiles[0].uuid) print(details.attributes)
- nipyapi.canvas.list_invalid_processors(pg_id='root', summary=False)[source]
Returns a flattened list of all Processors with Invalid Statuses
- nipyapi.canvas.list_sensitive_processors(pg_id='root', summary=False)[source]
Returns a flattened list of all Processors on the canvas which have sensitive properties that would need to be managed during deployment
- nipyapi.canvas.peek_flowfiles(connection, limit=1)[source]
Convenience function to list and get full details for FlowFiles at front of queue.
Combines list_flowfiles() and get_flowfile() to return complete FlowFile details including attributes for the first N FlowFiles in the queue.
- Parameters:
connection – Connection ID (str) or ConnectionEntity
limit – Number of FlowFiles to retrieve details for (default 1)
- Returns:
- List of FlowFile details
with full attributes. Returns empty list if queue is empty.
- Return type:
list[
FlowFileDTO]
Example:
# Peek at the first FlowFile flowfiles = nipyapi.canvas.peek_flowfiles(connection_id) if flowfiles: ff = flowfiles[0] print(f"Filename: {ff.filename}") print(f"Attributes: {ff.attributes}") # Peek at first 5 flowfiles = nipyapi.canvas.peek_flowfiles(connection_id, limit=5)
- nipyapi.canvas.prepare_controller_config(controller, properties=None, allow_dynamic=False)[source]
Discover valid property keys or prepare a validated ControllerServiceDTO.
Two modes based on whether properties is provided: - Discovery: Call with just controller to get list of valid property keys - Prepare: Call with properties dict to get validated ControllerServiceDTO
This helps prevent silent creation of dynamic properties from typos or guessed property names.
- Property Value Semantics:
value(string): Set property to that value''(empty string): Set property to empty stringNone: For static properties, clears/unsets the value. For dynamic properties, deletes the property entirely.
- Parameters:
controller (ControllerServiceEntity or str) – The controller to inspect, as a ControllerServiceEntity object, controller ID, or controller name.
properties (dict, optional) – Property key -> value mapping. If None, returns list of valid property keys (discovery mode).
allow_dynamic (bool) – If True, skip validation. Default False.
- Returns:
If properties is None, returns list of valid property keys. ControllerServiceDTO: If properties provided, returns validated DTO
ready to pass to update_controller().
- Return type:
- Raises:
ValueError – If any property key not found in controller’s descriptors (unless allow_dynamic=True). Error includes list of valid keys.
Example - Discovery:
>>> keys = nipyapi.canvas.prepare_controller_config(cs) ['Schema Access Strategy', 'Schema Registry', 'Schema Name', ...]
Example - Prepare:
>>> config = nipyapi.canvas.prepare_controller_config(cs, { ... 'Schema Access Strategy': 'schema-name' ... }) >>> nipyapi.canvas.update_controller(cs, update=config)
Example - Clear property:
>>> config = nipyapi.canvas.prepare_controller_config(cs, { ... 'Schema Name': None # Clears to unset state ... })
- nipyapi.canvas.prepare_processor_config(processor, properties=None, allow_dynamic=False)[source]
Discover valid property keys or prepare a validated ProcessorConfigDTO.
Two modes based on whether properties is provided: - Discovery: Call with just processor to get list of valid property keys - Prepare: Call with properties dict to get validated ProcessorConfigDTO
This helps prevent silent creation of dynamic properties from typos or guessed property names. NiFi accepts any property name and silently creates dynamic properties for unknown keys, which can cause hard-to-debug issues.
- Property Value Semantics:
value(string): Set property to that value''(empty string): Set property to empty stringNone: For static properties, clears/unsets the value. For dynamic properties, deletes the property entirely.
- Parameters:
processor (ProcessorEntity or str) – The processor to inspect, as a ProcessorEntity object, processor ID, or processor name.
properties (dict, optional) – Property key -> value mapping. If None, returns list of valid property keys (discovery mode).
allow_dynamic (bool) – If True, skip validation. Use for processors like UpdateAttribute that expect dynamic properties. Default False.
- Returns:
If properties is None, returns list of valid property keys. ProcessorConfigDTO: If properties provided, returns validated DTO
ready to pass to update_processor().
- Return type:
- Raises:
ValueError – If any property key not found in processor’s descriptors (unless allow_dynamic=True). Error includes list of valid keys.
Example - Discovery:
>>> keys = nipyapi.canvas.prepare_processor_config(proc) ['SQL Query', 'Database Connection Pooling Service', 'Max Wait Time', ...]
Example - Prepare:
>>> config = nipyapi.canvas.prepare_processor_config(proc, { ... 'SQL Query': 'SELECT * FROM users', ... 'Database Connection Pooling Service': controller.id ... }) >>> nipyapi.canvas.update_processor(proc, update=config, auto_stop=True)
Example - Dynamic properties (UpdateAttribute):
>>> config = nipyapi.canvas.prepare_processor_config(proc, { ... 'my.custom.attr': '${filename}' ... }, allow_dynamic=True)
Example - Clear static property:
>>> config = nipyapi.canvas.prepare_processor_config(proc, { ... 'Custom Text': None # Clears to unset state ... })
Example - Delete dynamic property:
>>> config = nipyapi.canvas.prepare_processor_config(proc, { ... 'my.custom.attr': None # Removes the dynamic property entirely ... }, allow_dynamic=True)
- nipyapi.canvas.purge_connection(con_id)[source]
Drops all FlowFiles in a given connection. Waits until the action is complete before returning.
Note that if upstream component isn’t stopped, more data may flow into the connection after this action.
- Parameters:
con_id (str) – The UUID of the Connection to be purged
- Returns:
The status reporting object for the drop request.
- Return type:
- nipyapi.canvas.purge_process_group(process_group, stop=False, greedy=True, identifier_type='auto')[source]
Purges the connections in a given Process Group of FlowFiles, and optionally stops it first
- Parameters:
process_group (ProcessGroupEntity or str) – Target Process Group, as a ProcessGroupEntity object, process group ID, or name.
stop (Optional [bool]) – Whether to stop the Process Group before action
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
True|False}]): Result set. A list of Dicts of
- Return type:
(list[dict{ID
Connection IDs mapped to True or False for success of each connection
- Raises:
TypeError – If process_group is not a string or ProcessGroupEntity.
ValueError – If process group not found or multiple matches found.
- nipyapi.canvas.recurse_flow(pg_id='root')[source]
Returns information about a Process Group and all its Child Flows.
Recurses the child flows by appending each process group with a ‘nipyapi_extended’ parameter which contains the child process groups, etc.
Note: This previously used actual recursion which broke on large NiFi environments, we now use a task/list update approach.
- Parameters:
pg_id (str) – The Process Group UUID
- Returns:
enriched NiFi Flow object
- Return type:
- nipyapi.canvas.schedule_all_controllers(pg_id, scheduled)[source]
Enable or Disable all Controller Services in a Process Group.
Uses NiFi’s native bulk controller service activation API which handles all descendant controller services automatically. Waits for all controllers to reach the target state before returning.
Note
When enabling, INVALID controllers (those with validation errors) are skipped since NiFi cannot enable them. The function waits only for VALID controllers to reach ENABLED state. When disabling, all controllers are included since any controller can be disabled.
- Parameters:
- Returns:
The result of the operation
- Return type:
- Raises:
ValueError – If scheduled is not a bool or valid state string.
- nipyapi.canvas.schedule_components(pg_id, scheduled, components=None)[source]
Change the scheduled target state of a list of components within a Process Group.
Note that this does not guarantee that components will be Started or Stopped afterwards, merely that they will have their scheduling updated.
This function only supports RUNNING and STOPPED states. For RUN_ONCE, use
schedule_processor()on individual processors.- Parameters:
- Returns:
True for success, False for failure
- Return type:
- Raises:
ValueError – If scheduled is not a bool or valid state string.
- nipyapi.canvas.schedule_controller(controller, scheduled, refresh=False, greedy=True, identifier_type='auto')[source]
Start/Enable or Stop/Disable a Controller Service
- Parameters:
controller (ControllerServiceEntity or str) – Target Controller to schedule, as a ControllerServiceEntity object, controller ID, or controller name.
scheduled (bool or str) – True/False for ENABLED/DISABLED, or one of “ENABLED”, “DISABLED”.
refresh (bool) – Whether to refresh the component revision before execution
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
(ControllerServiceEntity)
- Raises:
TypeError – If controller is not a string or ControllerServiceEntity.
ValueError – If controller not found, multiple matches found, or scheduled is not a bool or valid state string.
- nipyapi.canvas.schedule_port(port, scheduled, refresh=True, greedy=True, identifier_type='auto')[source]
Set a Port to Start, Stop, or Disable.
Note that this doesn’t guarantee that it will change state, merely that it will be instructed to try. Some effort is made to wait and see if the port reaches the target state.
- Parameters:
port (str or PortEntity) – The Port, as a PortEntity object, port ID, or port name.
scheduled (bool or str) – True/False for RUNNING/STOPPED, or one of “RUNNING”, “STOPPED”, “DISABLED”.
refresh (bool) – Whether to refresh the object before action.
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
True for success, False for failure.
- Return type:
- Raises:
TypeError – If port is not a string or PortEntity.
ValueError – If port not found, multiple matches found, or scheduled is not a bool or valid state string.
Example:
# Start a port by ID nipyapi.canvas.schedule_port("<port-id>", True) # Start a port object nipyapi.canvas.schedule_port(port, True) # Stop a port nipyapi.canvas.schedule_port(port, False) # Disable a port nipyapi.canvas.schedule_port(port, "DISABLED")
- nipyapi.canvas.schedule_process_group(process_group_id, scheduled, greedy=True, identifier_type='auto')[source]
Start or Stop a Process Group and all components.
Note that this doesn’t guarantee that all components have started, as some may be in Invalid states.
- Parameters:
process_group_id (str or ProcessGroupEntity) – The target Process Group, as a UUID string, process group name, or ProcessGroupEntity object.
scheduled (bool or str) – True/False for RUNNING/STOPPED, or one of “RUNNING”, “STOPPED”.
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
True of successfully scheduled, False if not
- Return type:
(bool)
- Raises:
TypeError – If process_group_id is not a string or ProcessGroupEntity.
ValueError – If process group not found, multiple matches found, or scheduled is not a bool or valid state string.
- nipyapi.canvas.schedule_processor(processor, scheduled, refresh=True, greedy=True, identifier_type='auto')[source]
Set a Processor to Start, Stop, Disable, or Run Once.
Note that this doesn’t guarantee that it will change state, merely that it will be instructed to try. Some effort is made to wait and see if the processor reaches the target state.
- Parameters:
processor (str or ProcessorEntity) – The Processor, as a ProcessorEntity object, processor ID, or processor name.
scheduled (bool or str) – True/False for RUNNING/STOPPED, or one of “RUNNING”, “STOPPED”, “DISABLED”, “RUN_ONCE”.
refresh (bool) – Whether to refresh the object before action.
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
True for success, False for failure.
- Return type:
- Raises:
TypeError – If processor is not a string or ProcessorEntity.
ValueError – If processor not found, multiple matches found, or scheduled is not a bool or valid state string.
Example:
# Start a processor by ID nipyapi.canvas.schedule_processor("<processor-id>", True) # Start a processor object (backwards compatible) nipyapi.canvas.schedule_processor(proc, True) # Stop a processor nipyapi.canvas.schedule_processor(proc, False) # Disable a processor (prevents starting, useful for maintenance) nipyapi.canvas.schedule_processor(proc, "DISABLED") # Run once - executes one scheduling cycle then stops nipyapi.canvas.schedule_processor(proc, "RUN_ONCE")
- nipyapi.canvas.set_remote_process_group_transmission(rpg, enable=True, refresh=True)[source]
Enable or Disable Transmission for an RPG.
Waits for the transmission state to actually change before returning.
- Parameters:
rpg (RemoteProcessGroupEntity) – The remote process group to modify
enable (bool) – True to enable transmission, False to disable
refresh (bool) – Whether to refresh the object before action
- Returns:
The updated remote process group
- Return type:
- Raises:
ValueError – If the state change times out
- nipyapi.canvas.update_connection(connection, name=None, bends=None, refresh=True)[source]
Update a connection’s configuration.
Only parameters explicitly provided will be updated. To clear bends, pass an empty list.
- Parameters:
connection (ConnectionEntity or str) – ConnectionEntity or connection ID to update.
name (str or None) – New name for the connection, or None to leave unchanged.
bends (list or None) – Bend points as list of PositionDTO or (x, y) tuples. None keeps existing bends, [] clears all bends, […] sets specific points.
refresh (bool) – Whether to refresh the connection before updating.
- Returns:
The updated connection
- Return type:
Example:
# Clear all bends from a connection updated = nipyapi.canvas.update_connection(conn, bends=[]) # Clear bends by connection ID updated = nipyapi.canvas.update_connection("abc-123-uuid", bends=[]) # Rename a connection updated = nipyapi.canvas.update_connection(conn, name="Primary Path") # Set specific bend points updated = nipyapi.canvas.update_connection(conn, bends=[(500, 300), (500, 400)])
- nipyapi.canvas.update_controller(controller, update, refresh=True, greedy=True, identifier_type='auto', auto_disable=False)[source]
Updates the Configuration of a Controller Service
Note
This function does not handle referencing components. If the controller has processors or other controllers that reference it, you must stop/disable those separately before updating.
- Parameters:
controller (ControllerServiceEntity or str) – Target Controller to update, as a ControllerServiceEntity object, controller ID, or controller name.
update (ControllerServiceDTO) – Controller Service configuration object containing the new config params and properties
refresh (bool) – Whether to refresh the controller to get latest revision before update. Defaults to True to avoid stale revision errors.
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
auto_disable (bool) – If True, automatically disable the controller before updating and re-enable afterward if it was enabled. Default False.
- Returns:
(ControllerServiceEntity)
- Raises:
TypeError – If controller is not a string or ControllerServiceEntity.
ValueError – If controller not found, multiple matches found, or controller is enabled and auto_disable=False.
- nipyapi.canvas.update_process_group(pg, update, refresh=True, greedy=True, identifier_type='auto')[source]
Updates a given Process Group.
- Parameters:
pg (ProcessGroupEntity or str) – The Process Group to target for update, as a ProcessGroupEntity object, process group ID, or name.
update (dict) – key:value pairs to update (e.g., name, comments)
refresh (bool) – Whether to refresh the Process Group before applying the update
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
The updated ProcessGroupEntity
- Return type:
- Raises:
TypeError – If pg is not a string or ProcessGroupEntity.
ValueError – If process group not found or multiple matches found.
Example:
# Rename a process group nipyapi.canvas.update_process_group(pg, {'name': 'New Name'}) # Update comments nipyapi.canvas.update_process_group(pg, {'comments': 'Updated'})
- nipyapi.canvas.update_processor(processor, update=None, name=None, refresh=True, auto_stop=False)[source]
Updates a Processor’s configuration and/or name.
- For configuration changes, pass a ProcessorConfigDTO:
nifi.ProcessorConfigDTO(scheduling_period=’3s’)
For renaming, pass the new name. Both can be provided together.
Processors must be stopped for certain updates (including renaming). If auto_stop is True (default), the processor will be stopped before updating and restarted afterward if it was originally running.
- Parameters:
processor (ProcessorEntity) – The Processor to target for update
update (ProcessorConfigDTO, optional) – Configuration parameters to update
name (str, optional) – New name for the processor
refresh (bool) – Whether to refresh the Processor object state before applying the update. Default True.
auto_stop (bool) – If True, automatically stop the processor before updating and restart afterward if it was running. Default False.
- Returns:
The updated ProcessorEntity
- Return type:
- Raises:
ValueError – If neither update nor name is provided, or if update is not a ProcessorConfigDTO, or if processor is running and auto_stop=False.
- nipyapi.canvas.verify_controller(controller, properties=None, attributes=None, greedy=True, identifier_type='auto')[source]
Verify a controller service’s configuration properties are valid.
Validates that all required properties are set and property values meet their defined constraints. Does NOT test actual connectivity or credentials. Handles the async verification workflow: submit request, poll until complete, cleanup.
The controller service must be DISABLED before verification.
- Parameters:
controller – ControllerServiceEntity, controller ID, or controller name.
properties – Optional dict of property overrides to verify
attributes – Optional dict of FlowFile attributes for Expression Language
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
Verification results. Each has verification_step_name, outcome (“SUCCESSFUL”/”FAILED”/”SKIPPED”), and explanation.
- Return type:
- Raises:
TypeError – If controller is not a string or ControllerServiceEntity.
ValueError – Controller not found, multiple matches found, or is currently enabled.
ApiException – NiFi API errors
Example:
results = nipyapi.canvas.verify_controller(dbcp_service) for r in results: print(f"{r.verification_step_name}: {r.outcome}") if r.outcome == "FAILED": print(f" Reason: {r.explanation}")
- nipyapi.canvas.verify_processor(processor, properties=None, attributes=None, greedy=True, identifier_type='auto')[source]
Verify a processor’s configuration properties are valid.
Validates that all required properties are set and property values meet their defined constraints. Does NOT test actual connectivity or external service availability. Handles the async verification workflow: submit request, poll until complete, cleanup.
The processor must be STOPPED before verification.
- Parameters:
processor – ProcessorEntity, processor ID, or processor name.
properties – Optional dict of property overrides to verify
attributes – Optional dict of FlowFile attributes for Expression Language
greedy (bool) – For name lookup, True for partial match, False for exact.
identifier_type (str) – How to interpret string identifier: “auto” (default) detects UUID vs name, “id” or “name” to force.
- Returns:
Verification results. Each has verification_step_name, outcome (“SUCCESSFUL”/”FAILED”/”SKIPPED”), and explanation.
- Return type:
- Raises:
TypeError – If processor is not a string or ProcessorEntity.
ValueError – Processor not found, multiple matches found, or is currently running.
ApiException – NiFi API errors
Example:
results = nipyapi.canvas.verify_processor(my_processor) for r in results: print(f"{r.verification_step_name}: {r.outcome}") if r.outcome == "FAILED": print(f" Reason: {r.explanation}")