Demos
These modules leverage functionality within the rest of the Package to demonstrate various capabilities
Client SDK modules
These wrapper modules contain collections of convenience functions for daily operations of your NiFi and NiFi-Registry environment. They wrap and surface underlying data structures and calls to the full SDK swagger clients which are also included in the package.
Canvas
For interactions with the NiFi Canvas.
- nipyapi.canvas.create_connection(source, target, relationships=None, name=None)[source]
Creates a connection between two objects for the given relationships
- Parameters:
source – Object to initiate the connection, e.g. ProcessorEntity
target – Object to terminate the connection, e.g. FunnelEntity
relationships (list) – list of strings of relationships to connect, may be collected from the object ‘relationships’ property (optional)
name (str) – Defaults to None, String of Name for Connection (optional)
- Returns:
for the created connection
- Return type:
- nipyapi.canvas.create_controller(parent_pg, controller, name=None)[source]
- Creates a new Controller Service in a given Process Group of the given
Controller type, with the given Name
- Parameters:
parent_pg (ProcessGroupEntity) – Target Parent PG
controller (DocumentedTypeDTO) – Type of Controller to create, found via the list_all_controller_types method
name (str[Optional]) – Name for the new Controller as a String
- Returns:
(ControllerServiceEntity)
- nipyapi.canvas.create_funnel(pg_id, position=None)[source]
Creates a Funnel Object
- Parameters:
pg_id (str) – ID of the parent Process Group
position (tuple[int, int]) – Position on canvas
- Returns:
(FunnelEntity) Created Funnel
- nipyapi.canvas.create_port(pg_id, port_type, name, state, position=None)[source]
Creates a new input or output port of given characteristics
- Parameters:
pg_id (str) – ID of the parent Process Group
port_type (str) – Either of INPUT_PORT or OUTPUT_PORT
name (str) – optional, Name to assign to the port
state (str) – One of RUNNING, STOPPED, DISABLED
position (tuple) – optional, tuple of ints like (400, 400)
- Returns:
(PortEntity) of the created port
- nipyapi.canvas.create_process_group(parent_pg, new_pg_name, location, comment='')[source]
Creates a new Process Group with the given name under the provided parent Process Group at the given Location
- Parameters:
parent_pg (ProcessGroupEntity) – The parent Process Group to create the new process group in
new_pg_name (str) – The name of the new Process Group
location (tuple[x, y]) – the x,y coordinates to place the new Process Group under the parent
comment (str) – Entry for the Comments field
- Returns:
The new Process Group
- Return type:
- nipyapi.canvas.create_processor(parent_pg, processor, location, name=None, config=None)[source]
Instantiates a given processor on the canvas
- Parameters:
parent_pg (ProcessGroupEntity) – The parent Process Group
processor (DocumentedTypeDTO) – The abstract processor type object to be instantiated
location (tuple[x, y]) – The location coordinates
name (Optional [str]) – The name for the new Processor
config (Optional [ProcessorConfigDTO]) – A configuration object for the new processor
- Returns:
The new Processor
- Return type:
- nipyapi.canvas.create_remote_process_group(target_uris, transport='RAW', pg_id='root', position=None)[source]
Creates a new Remote Process Group with given parameters
- Parameters:
target_uris (str) – Comma separated list of target URIs
transport (str) – optional, RAW or HTTP
pg_id (str) – optional, UUID of parent Process Group for remote process group
position (tuple) – optional, tuple of location ints
- Returns:
(RemoteProcessGroupEntity)
- nipyapi.canvas.delete_connection(connection, purge=False)[source]
Deletes a connection, optionally purges it first
- Parameters:
connection (ConnectionEntity) – Connection to delete
purge (bool) – True to Purge, Defaults to False
- Returns:
the modified Connection
- Return type:
- nipyapi.canvas.delete_controller(controller, force=False)[source]
Delete a Controller service, with optional prejudice
- Parameters:
controller (ControllerServiceEntity) – Target Controller to delete
force (bool) – True to attempt Disable the Controller before deletion
- Returns:
(ControllerServiceEntity)
- nipyapi.canvas.delete_funnel(funnel, refresh=True)[source]
Deletes a Funnel Object
- Parameters:
funnel (FunnelEntity) – The Funnel to delete
refresh (bool) – Whether to refresh the object state before execution
- Returns:
(FunnelEntity) Deleted FunnelEntity reference
- nipyapi.canvas.delete_process_group(process_group, force=False, refresh=True)[source]
Deletes a given Process Group, with optional prejudice.
- Parameters:
process_group (ProcessGroupEntity) – The target Process Group
force (bool) – Stop, purge and clean the target Process Group before deletion. Experimental.
refresh (bool) – Whether to refresh the state first
- Returns:
The updated object state
- Return type:
- nipyapi.canvas.delete_processor(processor, refresh=True, force=False)[source]
Deletes a Processor from the canvas, with optional prejudice.
- Parameters:
processor (ProcessorEntity) – The processor to delete
refresh (bool) – Whether to refresh the Processor state before action
force (bool) – Whether to stop, purge and remove connections to the Processor before deletion. Behavior may change in future releases.
- Returns:
The updated ProcessorEntity
- Return type:
- nipyapi.canvas.delete_remote_process_group(rpg, refresh=True)[source]
Deletes a given remote process group
- Parameters:
rpg (RemoteProcessGroupEntity) – Remote Process Group to remove
refresh (bool) – Whether to refresh the object before action
- Returns:
(RemoteProcessGroupEntity)
- nipyapi.canvas.get_bulletin_board()[source]
Retrieves the bulletin board object
- Returns:
The native datatype BulletinBoard object
- Return type:
- nipyapi.canvas.get_bulletins()[source]
Retrieves current bulletins (alerts) from the Flow Canvas
- Returns:
The native datatype containing a list
- Return type:
of bulletins
- nipyapi.canvas.get_component_connections(component)[source]
Returns list of Connections related to a given Component, e.g. Processor
- Parameters:
component – Component Object to filter by, e.g. a ProcessorEntity
- Returns:
List of ConnectionEntity Objects
- Return type:
(list)
- nipyapi.canvas.get_controller(identifier, identifier_type='name', bool_response=False)[source]
Retrieve a given Controller
- Parameters:
identifier (str) – ID or Name of a Controller to find
identifier_type (str) – ‘id’ or ‘name’, defaults to name
bool_response (bool) – If True, will return False if the Controller is not found - useful when testing for deletion completion
Returns:
- nipyapi.canvas.get_flow(pg_id='root')[source]
Returns information about a Process Group and flow.
This surfaces the native implementation, for the recursed implementation see ‘recurse_flow’
- Parameters:
pg_id (str) – id of the Process Group to retrieve, defaults to the root process group if not set
- Returns:
The Process Group object
- Return type:
- nipyapi.canvas.get_pg_parents_ids(pg_id)[source]
Retrieve the ids of the parent Process Groups.
- Parameters:
pg_id (str) – Process group id
- Returns:
(list) List of ids of the input PG parents
- nipyapi.canvas.get_process_group(identifier, identifier_type='name', greedy=True)[source]
Filters the list of all process groups against a given identifier string occurring in a given identifier_type field.
- Parameters:
identifier (str) – the string to filter the list for
identifier_type (str) – the field to filter on, set in config.py
greedy (bool) – True for partial match, False for exact match
- Returns:
None for no matches, Single Object for unique match, list(Objects) for multiple matches
- nipyapi.canvas.get_process_group_status(pg_id='root', detail='names')[source]
Returns an entity containing the status of the Process Group. Optionally may be configured to return a simple dict of name:id pairings
Note that there is also a ‘process group status’ command available, but it returns a subset of this data anyway, and this call is more useful
- Parameters:
pg_id (str) – The UUID of the Process Group
detail (str) – ‘names’ or ‘all’; whether to return a simple dict of name:id pairings, or the full details. Defaults to ‘names’
- Returns:
The Process Group Entity including the status
- Return type:
- nipyapi.canvas.get_processor(identifier, identifier_type='name', greedy=True)[source]
Filters the list of all Processors against the given identifier string in the given identifier_type field
- Parameters:
identifier (str) – The String to filter against
identifier_type (str) – The field to apply the filter to. Set in config.py
greedy (bool) – Whether to exact match (False) or partial match (True)
- Returns:
None for no matches, Single Object for unique match, list(Objects) for multiple matches
- nipyapi.canvas.get_processor_type(identifier, identifier_type='name', greedy=True)[source]
Gets the abstract object describing a Processor, or list thereof
- Parameters:
identifier (str) – the string to filter the list for
identifier_type (str) – the field to filter on, set in config.py
greedy (bool) – False for exact match, True for greedy match
- Returns:
None for no matches, Single Object for unique match, list(Objects) for multiple matches
- nipyapi.canvas.get_remote_process_group(rpg_id, summary=False)[source]
Fetch a remote process group object, with optional summary of just ports
- nipyapi.canvas.get_root_pg_id()[source]
Convenience function to return the UUID of the Root Process Group
Returns (str): The UUID of the root PG
- nipyapi.canvas.get_variable_registry(process_group, ancestors=True)[source]
Gets the contents of the variable registry attached to a Process Group
- Parameters:
process_group (ProcessGroupEntity) – The Process Group to retrieve the Variable Registry from
ancestors (bool) – Whether to include the Variable Registries from child Process Groups
- Returns:
The Variable Registry
- Return type:
- nipyapi.canvas.list_all_by_kind(kind, pg_id='root', descendants=True)[source]
Retrieves a list of all instances of a supported object type
- Parameters:
kind (str) – one of input_ports, output_ports, funnels, controllers, connections, remote_process_groups
pg_id (str) – optional, ID of the Process Group to use as search base
descendants (bool) – optional, whether to collect child group info
- Returns:
list of the Entity type of the kind, or single instance, or None
- nipyapi.canvas.list_all_connections(pg_id='root', descendants=True)[source]
Lists all connections for a given Process Group ID
- Parameters:
pg_id (str) – ID of the Process Group to retrieve Connections from
descendants (bool) – True to recurse child PGs, False to not
- Returns:
List of ConnectionEntity objects
- Return type:
(list)
- nipyapi.canvas.list_all_controller_types()[source]
Lists all Controller Service types available on the environment
- Returns:
list(DocumentedTypeDTO)
- nipyapi.canvas.list_all_controllers(pg_id='root', descendants=True)[source]
- Lists all controllers under a given Process Group, defaults to Root
Optionally recurses all child Process Groups as well
- Parameters:
pg_id (str) – String of the ID of the Process Group to list from
descendants (bool) – True to recurse child PGs, False to not
- Returns:
None, ControllerServiceEntity, or list(ControllerServiceEntity)
- nipyapi.canvas.list_all_funnels(pg_id='root', descendants=True)[source]
Convenience wrapper for list_all_by_kind for funnels
- nipyapi.canvas.list_all_input_ports(pg_id='root', descendants=True)[source]
Convenience wrapper for list_all_by_kind for input ports
- nipyapi.canvas.list_all_output_ports(pg_id='root', descendants=True)[source]
Convenience wrapper for list_all_by_kind for output ports
- nipyapi.canvas.list_all_process_groups(pg_id='root')[source]
Returns a flattened list of all Process Groups on the canvas. Potentially slow if you have a large canvas.
Note that the ProcessGroupsApi().get_process_groups(pg_id) command only provides the first layer of pgs, whereas this trawls the entire canvas
- Parameters:
pg_id (str) – The UUID of the Process Group to start from, defaults to the Canvas root
- Returns:
list[ProcessGroupEntity]
- nipyapi.canvas.list_all_processor_types()[source]
Produces the list of all available processor types in the NiFi instance
- Returns:
A native datatype containing the processors list
- Return type:
list(ProcessorTypesEntity)
- nipyapi.canvas.list_all_processors(pg_id='root')[source]
Returns a flat list of all Processors under the provided Process Group
- Parameters:
pg_id (str) – The UUID of the Process Group to start from, defaults to the Canvas root
- Returns:
list[ProcessorEntity]
- nipyapi.canvas.list_all_remote_process_groups(pg_id='root', descendants=True)[source]
Convenience wrapper for list_all_by_kind for remote process groups
- nipyapi.canvas.list_invalid_processors(pg_id='root', summary=False)[source]
Returns a flattened list of all Processors with Invalid Statuses
- Parameters:
pg_id (str) – The UUID of the Process Group to start from, defaults to the Canvas root
summary (bool) – True to return just the list of relevant properties per Processor, False for the full listing
- Returns:
list[ProcessorEntity]
- nipyapi.canvas.list_sensitive_processors(pg_id='root', summary=False)[source]
Returns a flattened list of all Processors on the canvas which have sensitive properties that would need to be managed during deployment
- Parameters:
pg_id (str) – The UUID of the Process Group to start from, defaults to the Canvas root
summary (bool) – True to return just the list of relevant properties per Processor, False for the full listing
- Returns:
list[ProcessorEntity] or list(dict)
- nipyapi.canvas.purge_connection(con_id)[source]
EXPERIMENTAL Drops all FlowFiles in a given connection. Waits until the action is complete before returning.
Note that if upstream component isn’t stopped, more data may flow into the connection after this action.
- Parameters:
con_id (str) – The UUID of the Connection to be purged
- Returns:
The status reporting object for the drop request.
- Return type:
- nipyapi.canvas.purge_process_group(process_group, stop=False)[source]
EXPERIMENTAL Purges the connections in a given Process Group of FlowFiles, and optionally stops it first
- Parameters:
process_group (ProcessGroupEntity) – Target Process Group
stop (Optional [bool]) – Whether to stop the Process Group before action
- Returns:
True|False}]): Result set. A list of Dicts of
- Return type:
(list[dict{ID
Connection IDs mapped to True or False for success of each connection
- nipyapi.canvas.recurse_flow(pg_id='root')[source]
Returns information about a Process Group and all its Child Flows. Recurses the child flows by appending each process group with a ‘nipyapi_extended’ parameter which contains the child process groups, etc. Note: This previously used actual recursion which broke on large NiFi
environments, we now use a task/list update approach
- Parameters:
pg_id (str) – The Process Group UUID
- Returns:
enriched NiFi Flow object
- Return type:
- nipyapi.canvas.schedule_components(pg_id, scheduled, components=None)[source]
Changes the scheduled target state of a list of components within a given Process Group.
Note that this does not guarantee that components will be Started or Stopped afterwards, merely that they will have their scheduling updated.
- Parameters:
pg_id (str) – The UUID of the parent Process Group
scheduled (bool) – True to start, False to stop
components (list[ComponentType]) – The list of Component Entities to schdule, e.g. ProcessorEntity’s
- Returns:
True for success, False for not
- Return type:
(bool)
- nipyapi.canvas.schedule_controller(controller, scheduled, refresh=False)[source]
Start/Enable or Stop/Disable a Controller Service
- Parameters:
controller (ControllerServiceEntity) – Target Controller to schedule
scheduled (bool) – True to start, False to stop
refresh (bool) – Whether to refresh the component revision before execution
- Returns:
(ControllerServiceEntity)
- nipyapi.canvas.schedule_process_group(process_group_id, scheduled)[source]
Start or Stop a Process Group and all components.
Note that this doesn’t guarantee that all components have started, as some may be in Invalid states.
- Parameters:
process_group_id (str) – The UUID of the target Process Group
scheduled (bool) – True to start, False to stop
- Returns:
True of successfully scheduled, False if not
- Return type:
(bool)
- nipyapi.canvas.schedule_processor(processor, scheduled, refresh=True)[source]
Set a Processor to Start or Stop.
Note that this doesn’t guarantee that it will change state, merely that it will be instructed to try. Some effort is made to wait and see if the processor starts
- Parameters:
processor (ProcessorEntity) – The Processor to target
scheduled (bool) – True to start, False to stop
refresh (bool) – Whether to refresh the object before action
- Returns:
True for success, False for failure
- Return type:
(bool)
- nipyapi.canvas.set_remote_process_group_transmission(rpg, enable=True, refresh=True)[source]
Enable or Disable Transmission for an RPG
- Parameters:
rpg (RemoteProcessGroupEntity) – The ID of the remote process group to modify
enable (bool) – True to enable, False to disable
refresh (bool) – Whether to refresh the object before action
Returns:
- nipyapi.canvas.update_controller(controller, update)[source]
Updates the Configuration of a Controller Service
- Parameters:
controller (ControllerServiceEntity) – Target Controller to update
update (ControllerServiceDTO) – Controller Service configuration object containing the new config params and properties
- Returns:
(ControllerServiceEntity)
- nipyapi.canvas.update_process_group(pg, update, refresh=True)[source]
Updates a given Process Group.
- Parameters:
pg (ProcessGroupEntity) – The Process Group to target for update
update (dict) – key:value pairs to update
refresh (bool) – Whether to refresh the Process Group before applying the update
- Returns:
The updated ProcessorEntity
- Return type:
- nipyapi.canvas.update_processor(processor, update, refresh=True)[source]
Updates configuration parameters for a given Processor.
An example update would be: nifi.ProcessorConfigDTO(scheduling_period=’3s’)
- Parameters:
processor (ProcessorEntity) – The Processor to target for update
update (ProcessorConfigDTO) – The new configuration parameters
refresh (bool) – Whether to refresh the Processor object state before applying the update
- Returns:
The updated ProcessorEntity
- Return type:
- nipyapi.canvas.update_variable_registry(process_group, update, refresh=True)[source]
Updates one or more key:value pairs in the variable registry
- Parameters:
process_group (ProcessGroupEntity) – The Process Group which has the
updated (Variable Registry to be) –
update (list[tuple]) – The variables to write to the registry
refresh (bool) – Whether to refresh the object revision before updating
- Returns:
The created or updated Variable Registry Entries
- Return type:
Config
A set of defaults and parameters used elsewhere in the project. Also provides a handy link to the low-level client SDK configuration singleton objects.
Parameters
For Managing NiFi Parameter Contexts
- nipyapi.parameters.assign_context_to_process_group(pg, context_id, cascade=False)[source]
Assigns a given Parameter Context to a specific Process Group Optionally cascades down to direct children Process Groups
- Parameters:
pg (ProcessGroupEntity) – The Process Group to target
context_id (str) – The ID of the Parameter Context
cascade (bool) – Cascade Parameter Context down to child Process Groups?
- Returns:
(ProcessGroupEntity) The updated Process Group
- nipyapi.parameters.create_parameter_context(name, description=None, parameters=None, inherited_contexts=None)[source]
- Create a new Parameter Context with optional description and
initial Parameters
- Parameters:
name (str) – The Name for the new Context
description (str) – An optional description
parameters (list[ParameterEntity]) – A list of prepared Parameters
inherited_contexts (list[ParameterContextEntity]) – A list of inherited Parameter Contexts
- Returns:
(ParameterContextEntity) The New Parameter Context
- nipyapi.parameters.delete_parameter_context(context, refresh=True)[source]
Removes a Parameter Context
- Parameters:
context (ParameterContextEntity) – Parameter Context to be deleted
refresh (bool) – Whether to refresh the Context before Deletion
- Returns:
(ParameterContextEntity) The removed Parameter Context
- nipyapi.parameters.delete_parameter_from_context(context, parameter_name)[source]
Delete a specific Parameter from a Parameter Context :param context: The Parameter Context to Update :type context: ParameterContextEntity :param parameter_name: The Parameter to delete :type parameter_name: str
- Returns:
(ParameterContextEntity) The updated Parameter Context
- nipyapi.parameters.get_parameter_context(identifier, identifier_type='name', greedy=True)[source]
Gets one or more Parameter Contexts matching a given identifier
- Parameters:
identifier (str) – The Name or ID matching Parameter Context(s)
identifier_type (str) – ‘name’ or ‘id’
greedy (bool) – False for exact match, True for string match
- Returns:
None for no matches, Single Object for unique match, list(Objects) for multiple matches
- nipyapi.parameters.list_all_parameter_contexts()[source]
Lists all Parameter Contexts available on the Canvas
- Returns:
list(ParameterContextEntity)
- nipyapi.parameters.prepare_parameter(name, value, description=None, sensitive=False)[source]
Parses basic inputs into a Parameter object ready for submission
- Parameters:
name (str) – The Name for the Parameter
value (str, int, float) – The Value for the Parameter
description (str) – Optional Description for the Parameter
sensitive (bool) – Whether to mark the Parameter Value as sensitive
- Returns:
(ParameterEntity) The ParameterEntity ready for use
- nipyapi.parameters.remove_context_from_process_group(pg)[source]
Clears any Parameter Context from the given Process Group
- Parameters:
pg (ProcessGroupEntity) – The Process Group to target
- Returns:
(ProcessGroupEntity) The updated Process Group
- nipyapi.parameters.update_parameter_context(context)[source]
Update an already existing Parameter Context
- Parameters:
context (ParameterContextEntity) – Parameter Context updated to be applied
refresh (bool) – Whether to refresh the object before Updating
- Returns:
(ParameterContextEntity) The updated Parameter Context
- nipyapi.parameters.upsert_parameter_to_context(context, parameter)[source]
Insert or Update Parameter within a Parameter Context
- Parameters:
context (ParameterContextEntity) – The Parameter Context to Modify
parameter (ParameterEntity) – The ParameterEntity to insert or update
- Returns:
(ParameterContextEntity) The updated Parameter Context
Security
Secure connectivity management for NiPyApi
- nipyapi.security.add_user_group_to_access_policy(user_group, policy, service='nifi', refresh=True)[source]
Attempts to add the given user group object to the given access policy
- Parameters:
user_group (UserGroup) or (UserGroupEntity) – User group object to add
policy (AccessPolicyEntity) or (AccessPolicy) – Access Policy object
service (str) – ‘nifi’ or ‘registry’ to identify the target service
refresh (bool) – Whether to refresh the policy object before submission
- Returns:
Updated Policy object
- nipyapi.security.add_user_to_access_policy(user, policy, service='nifi', refresh=True, strict=True)[source]
Attempts to add the given user object to the given access policy
- Parameters:
user (User) or (UserEntity) – User object to add
policy (AccessPolicyEntity) or (AccessPolicy) – Access Policy object
service (str) – ‘nifi’ or ‘registry’ to identify the target service
refresh (bool) – Whether to refresh the policy object before submit
strict (bool) – If True, will return error if user already present, if False will ignore the already exists
- Returns:
Updated Policy object
- nipyapi.security.bootstrap_security_policies(service, user_identity=None, group_identity=None)[source]
Creates a default security context within NiFi or Nifi-Registry
- Parameters:
service (str) – ‘nifi’ or ‘registry’ to indicate which service
user_identity – a service user to establish in the security context
group_identity – a service group to establish in the security context
- Returns:
None
- nipyapi.security.create_access_policy(resource, action, r_id=None, service='nifi')[source]
Creates an access policy for the given resource, action and optionally resource_id for NiFi.
- Parameters:
resource (str) – a valid resource type for this service, e.g. ‘bucket’
action (str) – a valid action type for this service, typically ‘read’, ‘write’ or ‘delete’
r_id (optional[str]) – if NiFi, the resource ID of the resource
service (str) – the service to target
- Returns:
An access policy object for that service
- nipyapi.security.create_service_user(identity, service='nifi', strict=True)[source]
Attempts to create a user with the provided identity in the given service
- Parameters:
identity (str) – Identity string for the user
service (str) – ‘nifi’ or ‘registry’
strict (bool) – If Strict, will error if user already exists
- Returns:
The new (User) or (UserEntity) object
- nipyapi.security.create_service_user_group(identity, service='nifi', users=None, strict=True)[source]
Attempts to create a user with the provided identity and member users in the given service
- Parameters:
identity (str) – Identity string for the user group
service (str) – ‘nifi’ or ‘registry’
users (list) – A list of nifi.UserEntity or registry.User belonging to the group
strict (bool) – Whether to throw an error on already exists
- Returns:
The new (UserGroup) or (UserGroupEntity) object
- nipyapi.security.get_access_policy_for_resource(resource, action, r_id=None, service='nifi', auto_create=False)[source]
Attempts to retrieve the access policy for a given resource and action, and optionally resource_id if targeting NiFi. Optionally creates the policy if it doesn’t already exist
- Parameters:
resource (str) – A valid resource in the target service
action (str) – A valid action, typically ‘read’, ‘write’ or ‘delete’
r_id (Optional[str]) – The UUID of the resource, valid only if targeting NiFi resources
service (str) – Which service to target, typically ‘nifi’ or ‘registry’
auto_create (bool) – Whether to create the targeted policy if it doesn’t already exist
- Returns:
The relevant AccessPolicy object
- nipyapi.security.get_service_access_status(service='nifi', bool_response=False)[source]
Gets the access status for the current session
- Parameters:
service (str) – A String of ‘nifi’ or ‘registry’ to indicate which service to check status for
bool_response (bool) – If True, the function will return False on hitting an Error instead of raising it. Useful for connection testing.
- Returns:
(bool) if bool_response, else the Service Access Status of the User
- nipyapi.security.get_service_user(identifier, identifier_type='identity', service='nifi')[source]
Gets the unique user matching to the given identifier and type.
- Parameters:
identifier (str) – the string to search for
identifier_type (str) – the field to search in
service (str) – the name of the service
- Returns:
None if no match, else single object
- nipyapi.security.get_service_user_group(identifier, identifier_type='identity', service='nifi')[source]
Gets the unique group matching to the given identifier and type.
- Parameters:
identifier (str) – the string to search for
identifier_type (str) – the field to search in, identity or id
service (str) – the name of the service
- Returns:
None if no match, else single object
- nipyapi.security.list_service_user_groups(service='nifi')[source]
Returns list of service user groups for a given service :param service: ‘nifi’ or ‘registry’ :type service: str
- Returns:
[(nifi.UserGroupEntity, registry.UserGroup)]
- nipyapi.security.list_service_users(service='nifi')[source]
Lists all users of a given service, takes a service name as a string
- nipyapi.security.remove_service_user(user, service='nifi', strict=True)[source]
Removes a given User from the given Service
- Parameters:
user – [(nifi.UserEntity), (registry.User)] Target User object
service (str) – ‘nifi’ or ‘registry’
strict (bool) – Whether to throw an error if User not found
- Returns:
Updated User Entity or None
- nipyapi.security.remove_service_user_group(group, service='nifi', strict=True)[source]
Removes a given User Group from the given Service
- Parameters:
group – [(nifi.UserEntity), (registry.User)] Target User object
service (str) – ‘nifi’ or ‘registry’
strict (bool) – Whether to throw an error if User not found
- Returns:
Updated User Group or None
- nipyapi.security.service_login(service='nifi', username=None, password=None, bool_response=False)[source]
Login to the currently configured NiFi or NiFi-Registry server.
Login requires a secure connection over https. Prior to calling this method, the host must be specified and the SSLContext should be configured (if necessary).
Successful login will result in a generated token (JWT) being cached in the api_client config that will be passed in all future REST API calls. To clear that token, call service_logout.
The token is temporary and will expire after a duration set by the server. After a token expires, you must call this method again to generate a new token.
- Parameters:
service (str) – ‘nifi’ or ‘registry’; the service to login to
username (str) – The username to submit
password (str) – The password to use
bool_response (bool) – If True, the function will return False instead of an error. Useful for connection testing.
- Returns:
True if successful, False or an Error if not. See bool_response
- Return type:
(bool)
- nipyapi.security.service_logout(service='nifi')[source]
Logs out from the service by resetting the token :param service: ‘nifi’ or ‘registry’; the target service :type service: str
- Returns:
True of access removed, False if still set
- Return type:
(bool)
- nipyapi.security.set_service_auth_token(token=None, token_name='tokenAuth', service='nifi')[source]
Helper method to set the auth token correctly for the specified service
- Parameters:
token (Optional[str]) – The token to set. Defaults to None.
token_name (str) – the api_key field name to set the token to. Defaults to ‘tokenAuth’
service (str) – ‘nifi’ or ‘registry’, the service to set
- Returns:
True on success, False if token not set
- Return type:
(bool)
- nipyapi.security.set_service_ssl_context(service='nifi', ca_file=None, client_cert_file=None, client_key_file=None, client_key_password=None, check_hostname=None)[source]
Create an SSLContext for connecting over https to a secured NiFi or NiFi-Registry instance.
This method can be used to create an SSLContext for two-way TLS in which a client cert is used by the service to authenticate the client.
This method can also be used for one-way TLS in which the client verifies the server’s certificate, but authenticates using a different form of credentials, such as LDAP username/password.
If you are using one-way TLS with a certificate signed by a root CA trusted by your system/platform, this step is not necessary as the default TLS-handshake should “just work.”
- Parameters:
service (str) – ‘nifi’ or ‘registry’ to indicate which service config to set the ssl context to
ca_file (str) – A PEM file containing certs for the root CA(s) for the NiFi Registry server
client_cert_file (str) – A PEM file containing the public certificates for the user/client identity
client_key_file (str) – An encrypted (password-protected) PEM file containing the client’s secret key
client_key_password (str) – The password to decrypt the client_key_file
check_hostname (bool) – Enable or Disable hostname checking
- Returns:
(None)
- nipyapi.security.update_access_policy(policy, service='nifi')[source]
Applies an updated access policy to the service indicated
- Parameters:
policy (PolicyEntity) – The policy object to submit
service (str) – ‘nifi’ or ‘registry’ to indicate the target service
- Returns:
The updated policy if successful
- Return type:
(PolicyEntity)
System
For system and cluster level functions interacting with the underlying NiFi Services
- nipyapi.system.get_cluster()[source]
EXPERIMENTAL Returns the contents of the NiFi cluster
Returns (json):
- nipyapi.system.get_nifi_version_info()[source]
Returns the version information of the connected NiFi instance
Returns (VersionInfoDTO):
Templates
For Managing NiFi Templates
- nipyapi.templates.create_pg_snippet(pg_id)[source]
Creates a snippet of the targeted process group, and returns the object ready to be turned into a Template
- Parameters:
pg_id – UUID of the process Group to snippet
- Returns:
The Snippet Object
- Return type:
- nipyapi.templates.create_template(pg_id, name, desc='')[source]
Creates a Template from a Process Group
- Parameters:
pg_id (str) – The UUID of the target Process Group
name (str) – The name for the new Template. Must be unique
desc (optional[str]) – The description for the new Template
- Returns:
The newly created Template
- Return type:
- nipyapi.templates.delete_template(t_id)[source]
Deletes a Template
- Parameters:
t_id (str) – UUID of the Template to be deleted
- Returns:
The updated Template object
- nipyapi.templates.deploy_template(pg_id, template_id, loc_x=0.0, loc_y=0.0)[source]
Instantiates a given template request in a given process group
- Parameters:
pg_id (str) – The UUID of the Process Group to deploy into
template_id (str) – The UUID of the Template to deploy. Note that the Template must already be uploaded and available to the target Process Group
loc_x (float) – The X coordinate to deploy the Template at. Default(0.0)
loc_y (float) – The X coordinate to deploy the Template at. Default(0.0)
- Returns:
- The FlowEntity of the Process Group with the deployed
template
- Return type:
- nipyapi.templates.export_template(t_id, output='string', file_path=None)[source]
Exports a given Template as either a string or a file.
Note that to reimport the Template it must be a file
- Parameters:
t_id (str) – The UUID of the Template to export
output (str) – ‘string’ or ‘file’ to set the export action
file_path (Optional [str]) – The full path including filename to write the Template export to
- Returns:
- A String representation of the exported Template XML. Note
that this may not be utf-8 encoded.
- Return type:
(str)
- nipyapi.templates.get_template(identifier, identifier_type='name', greedy=False)[source]
Filters the list of all Templates for a given string in a given field. Note that filters are configured in config.py
- Parameters:
identifier (str) – The string to filter on
identifier_type (str) – The identifier of the field to filter on
greedy (bool) – True for greedy match, False for exact match
- Returns:
None for no matches, Single Object for unique match, list(Objects) for multiple matches
- nipyapi.templates.get_template_by_name(name)[source]
DEPRECATED Returns a specific template by name, if it exists.
Note: This function is replaced by get_template
- Parameters:
name (str) – The Name of the template, exact match required
- Returns:
(TemplateEntity)
- nipyapi.templates.list_all_templates(native=True)[source]
Gets a list of all templates on the canvas
- Returns:
A list of TemplateEntity’s
- Return type:
(list[TemplateEntity])
- nipyapi.templates.load_template_from_xml_file_path(file_path)[source]
Loads a TemplateEntity from an xml file for a given path
- Parameters:
file_path (str) – path to the xml file
- Returns:
TemplateEntity
- nipyapi.templates.load_template_from_xml_file_stream(file_stream)[source]
Loads a TemplateEntity from a template xml file
- Parameters:
file_stream (io stream) – the xml file stream as returned by open
- Returns:
TemplateEntity
- nipyapi.templates.load_template_from_xml_string(xml_string)[source]
Loads a TemplateEntity from xml string, as if you had read in the xml file to string
- Parameters:
xml_string (str) – string of xml
- Returns:
TemplateEntity
- nipyapi.templates.upload_template(pg_id, template_file)[source]
Uploads a given template xml from from the file system to the given Process Group
- Parameters:
pg_id (str) – The UUID of the Process Group to upload to
template_file (str) – The path including filename to the template file
- Returns:
The new Template object
- Return type:
Utils
Convenience utility functions for NiPyApi, not really intended for external use
- class nipyapi.utils.DockerContainer(name=None, image_name=None, image_tag=None, ports=None, env=None, volumes=None, test_url=None, endpoint=None)[source]
Bases:
object
Helper class for Docker container automation without using Ansible
- nipyapi.utils.bypass_slash_encoding(service, bypass)[source]
Instructs the API Client to bypass encoding the ‘/’ character
- Parameters:
service (str) – ‘nifi’ or ‘registry’
bypass (bool) – True will not encode ‘/’ in fields via API calls
- Returns:
None
- nipyapi.utils.check_version(base, comparator=None, service='nifi', default_version='0.2.0')[source]
Compares version base against either version comparator, or the version of the currently connected service instance.
Since NiFi is java, it may return a version with -SNAPSHOT as part of it. As such, that will be stripped from either the comparator version or the version returned from NiFi
- Parameters:
base (str) – The base version for the comparison test
comparator (optional[str]) – The version to compare against
default_version (optional[str]) – The version to assume the service is if the check cannot be completed
service (str) – The service to test the version against, currently only supports NiFi
Returns (int): -1/0/1 if base is lower/equal/newer than comparator
- nipyapi.utils.dump(obj, mode='json')[source]
- Dumps a native datatype object or swagger entity to json or yaml
defaults to json
- Parameters:
obj (varies) – The native datatype object or swagger type to serialise
mode (str) – ‘json’ or ‘yaml’, the supported export modes
Returns (str): The serialised object
- nipyapi.utils.enforce_min_ver(min_version, bool_response=False, service='nifi')[source]
Raises an error if target NiFi environment is not minimum version :param min_version: Version to check against :type min_version: str :param bool_response: If True, will return True instead of
raising error
- Returns:
(bool) or (NotImplementedError)
- nipyapi.utils.exception_handler(status_code=None, response=None)[source]
Simple Function wrapper to handle HTTP Status Exceptions
- nipyapi.utils.filter_obj(obj, value, key, greedy=True)[source]
Implements a custom filter method because native datatypes don’t have consistently named or located fields.
Note that each object used by this function must be registered with identifier_types and identifiers in config
- Parameters:
obj (varies) – the NiFi or NiFi-Registry object to filter on
value (str) – the String value to look for
key (str) – the object key to filter against
greedy (bool) – If True, the value will be matched anywhere in the string, if False it will require exact match
Returns: None if 0 matches, list if > 1, single Object entity if ==1
- nipyapi.utils.fs_read(file_path)[source]
Convenience function to read an Object from a FilePath
- Parameters:
file_path (str) – The Full path including filename to read from
Returns: The object that was read
- nipyapi.utils.fs_write(obj, file_path)[source]
Convenience function to write an Object to a FilePath
- Parameters:
obj (varies) – The Object to write out
file_path (str) – The Full path including filename to write to
Returns: The object that was written
- nipyapi.utils.infer_object_label_from_class(obj)[source]
- Returns the expected STRING label for an object class required by certain
functions.
- Parameters:
obj – The object to infer the name of
- Returns:
str of the relevant name, or raises an AssertionError
- nipyapi.utils.is_endpoint_up(endpoint_url)[source]
Tests if a URL is available for requests
- Parameters:
endpoint_url (str) – The URL to test
Returns (bool): True for a 200 response, False for not
- nipyapi.utils.load(obj, dto=None)[source]
Loads a serialised object back into native datatypes, and optionally imports it back into the native NiFi DTO
Warning: Using this on objects not produced by this Package may have unintended results! While efforts have been made to ensure that unsafe loading is not possible, no stringent security testing has been completed.
- Parameters:
obj (dict, list) – The serialised object to import
dto (Optional [tuple{str, str}]) – A Tuple describing the service and
constructed. (object that should be) –
= (e.g. dto) –
- Returns: Either the loaded object in native Python datatypes, or the
constructed native datatype object
- nipyapi.utils.set_endpoint(endpoint_url, ssl=False, login=False, username=None, password=None)[source]
EXPERIMENTAL
Sets the endpoint when switching between instances of NiFi or other projects. Not tested extensively with secured instances.
- Parameters:
endpoint_url (str) – The URL to set as the endpoint. Auto-detects the relevant service e.g. ‘http://localhost:18080/nifi-registry-api’
ssl (bool) – Whether to use the default security context in nipyapi.config to authenticate if a secure URL is detected
login (bool) – Whether to attempt login using default cred in config requires ssl to be set
username (str) – The username to use for login, if specified
password (str) – The password to use for login, if specified
Returns (bool): True for success, False for not
- nipyapi.utils.start_docker_containers(docker_containers, network_name='demo')[source]
Deploys a list of DockerContainer’s on a given network
- Parameters:
docker_containers (list[DockerContainer]) – list of Dockers to start
network_name (str) – The name of the Docker Bridge Network to get or create for the Docker Containers
Returns: Nothing
- nipyapi.utils.validate_parameters_versioning_support(verify_nifi=True, verify_registry=True)[source]
Convenience method to check if Parameters are supported :param verify_nifi: If True, check NiFi meets the min version :type verify_nifi: bool :param verify_registry: If True, check Registry meets the min version :type verify_registry: bool
- nipyapi.utils.wait_to_complete(test_function, *args, **kwargs)[source]
Implements a basic return loop for a given function which is capable of a True|False output
- Parameters:
test_function – Function which returns a bool once the target state is reached
delay (int) – The number of seconds between each attempt, defaults to config.short_retry_delay
max_wait (int) – the maximum number of seconds before issuing a Timeout, defaults to config.short_max_wait
*args – Any args to pass through to the test function
**kwargs – Any Keword Args to pass through to the test function
Returns (bool): True for success, False for not
Versioning
For interactions with the NiFi Registry Service and related functions
- nipyapi.versioning.create_flow(bucket_id, flow_name, flow_desc='', flow_type='Flow')[source]
Creates a new VersionedFlow stub in NiFi Registry. Can be used to write VersionedFlow information to without using a NiFi Process Group directly
- Parameters:
bucket_id (str) – UUID of the Bucket to write to
flow_name (str) – Name for the new VersionedFlow object
flow_desc (Optional [str]) – Description for the new VersionedFlow object
flow_type (Optional [str]) – Type of the VersionedFlow, should be ‘Flow’
- Returns:
(VersionedFlow)
- nipyapi.versioning.create_flow_version(flow, flow_snapshot, refresh=True)[source]
EXPERIMENTAL
Writes a FlowSnapshot into a VersionedFlow as a new version update
Note that this differs from save_flow_ver which creates a new Flow Version containing the snapshot. This function writes a snapshot to an existing Flow Version. Useful in migrating Flow Versions between environments.
- Parameters:
flow (VersionedFlowObject) – the VersionedFlow object to write to
flow_snapshot (VersionedFlowSnapshot) – the Snapshot to write into the VersionedFlow
refresh (bool) – Whether to refresh the object status before actioning
- Returns:
The new (VersionedFlowSnapshot)
- nipyapi.versioning.create_registry_bucket(name)[source]
Creates a new Registry Bucket
- Parameters:
name (str) – name for the bucket, must be unique in the Registry
- Returns:
The new Bucket object
- Return type:
(Bucket)
- nipyapi.versioning.create_registry_client(name, uri, description)[source]
Creates a Registry Client in the NiFi Controller Services
- Parameters:
name (str) – The name of the new Client
uri (str) – The URI for the connection, such as ‘http://registry:18080’
description (str) – A description for the Client
- Returns:
The new registry client object
- Return type:
(FlowRegistryClientEntity)
- nipyapi.versioning.delete_registry_client(client, refresh=True)[source]
Deletes a Registry Client from the list of NiFI Controller Services
- Parameters:
client (FlowRegistryClientEntity) – The client to delete
refresh (bool) – Whether to refresh the object before action
- Returns:
The updated client object
- Return type:
(FlowRegistryClientEntity)
- nipyapi.versioning.deploy_flow_version(parent_id, location, bucket_id, flow_id, reg_client_id, version=None)[source]
Deploys a versioned flow as a new process group inside the given parent process group. If version is not provided, the latest version will be deployed.
- Parameters:
parent_id (str) – The ID of the parent Process Group to create the new process group in.
location (tuple[x, y]) – the x,y coordinates to place the new Process Group under the parent
bucket_id (str) – ID of the bucket containing the versioned flow to deploy.
reg_client_id (str) – ID of the registry client connection to use.
flow_id (str) – ID of the versioned flow to deploy.
version (Optional [int,str]) – version to deploy, if not provided latest version will be deployed.
- Returns:
(ProcessGroupEntity) of the newly deployed Process Group
- nipyapi.versioning.export_flow_version(bucket_id, flow_id, version=None, file_path=None, mode='json')[source]
Convenience method to export the identified VersionedFlowSnapshot in the provided format mode.
- Parameters:
bucket_id (str) – the UUID of the bucket containing the Flow
flow_id (str) – the UUID of the Flow to be retrieved from the Bucket
version (Optional [None, Str]) – ‘None’ to retrieve the latest version, or a version number as a string to get that version
file_path (str) – The path and filename to write to. Defaults to None which returns the serialised obj
mode (str) – ‘json’ or ‘yaml’ to specific the encoding format
- Returns:
(str) of the encoded Snapshot
- nipyapi.versioning.get_flow_in_bucket(bucket_id, identifier, identifier_type='name', greedy=True)[source]
Filters the Flows in a Bucket against a particular identifier
- Parameters:
bucket_id (str) – UUID of the Bucket to filter against
identifier (str) – The string to filter on
identifier_type (str) – The param to check
greedy (bool) – False for exact match, True for greedy match
- Returns:
None for no matches, Single Object for unique match, list(Objects) for multiple matches
- nipyapi.versioning.get_flow_version(bucket_id, flow_id, version=None, export=False)[source]
Retrieves the latest, or a specific, version of a Flow
- Parameters:
bucket_id (str) – the UUID of the bucket containing the Flow
flow_id (str) – the UUID of the Flow to be retrieved from the Bucket
version (Optional [None, str]) – ‘None’ to retrieve the latest version, or a version number as a string to get that version
export (bool) – True to get the raw json object from the server for export, False to get the native DataType
- Returns:
If export=False, or the raw json otherwise
- Return type:
WARNING: This call is impacted by https://issues.apache.org/jira/browse/NIFIREG-135 Which means you sometimes can’t trust the version count
- nipyapi.versioning.get_latest_flow_ver(bucket_id, flow_id)[source]
Gets the most recent version of a VersionedFlowSnapshot from a bucket
- Parameters:
bucket_id (str) – the UUID of the Bucket containing the flow
flow_id (str) – the UUID of the VersionedFlow to be retrieved
- Returns:
(VersionedFlowSnapshot)
- nipyapi.versioning.get_registry_bucket(identifier, identifier_type='name', greedy=True)[source]
Filters the Bucket list to a particular identifier
- Parameters:
identifier (str) – the filter string
identifier_type (str) – the param to filter on
greedy (bool) – False for exact match, True for greedy match
- Returns:
None for no matches, Single Object for unique match, list(Objects) for multiple matches
- nipyapi.versioning.get_registry_client(identifier, identifier_type='name')[source]
Filters the Registry clients to a particular identifier
- Parameters:
identifier (str) – the filter string
identifier_type (str) – the parameter to filter on
- Returns:
None for no matches, Single Object for unique match, list(Objects) for multiple matches
- nipyapi.versioning.get_version_info(process_group)[source]
Gets the Version Control information for a particular Process Group
- Parameters:
process_group (ProcessGroupEntity) – the ProcessGroup to work with
- Returns:
(VersionControlInformationEntity)
- nipyapi.versioning.import_flow_version(bucket_id, encoded_flow=None, file_path=None, flow_name=None, flow_id=None)[source]
Imports a given encoded_flow version into the bucket and flow described, may optionally be passed a file to read the encoded flow_contents from.
Note that only one of encoded_flow or file_path, and only one of flow_name or flow_id should be specified.
- Parameters:
bucket_id (str) – UUID of the bucket to write the encoded_flow version
encoded_flow (Optional [str]) – The encoded flow to import; if not specified file_path is read from.
file_path (Optional [str]) – The file path to read the encoded flow from , if not specified encoded_flow is read from.
flow_name (Optional [str]) – If this is to be the first version in a new flow object, then this is the String name for the flow object.
flow_id (Optional [str]) – If this is a new version for an existing flow object, then this is the ID of that object.
- Returns:
The new (VersionedFlowSnapshot)
- nipyapi.versioning.list_flow_versions(bucket_id, flow_id, registry_id=None, service='registry')[source]
EXPERIMENTAL List all the versions of a given Flow in a given Bucket
- Parameters:
bucket_id (str) – UUID of the bucket holding the flow to be enumerated
flow_id (str) – UUID of the flow in the bucket to be enumerated
registry_id (str) – UUID of the registry client linking the bucket, only required if requesting flows via NiFi instead of directly Registry
service (str) – Accepts ‘nifi’ or ‘registry’, indicating which service to query
- Returns:
- list(VersionedFlowSnapshotMetadata) or
(VersionedFlowSnapshotMetadataSetEntity)
- nipyapi.versioning.list_flows_in_bucket(bucket_id)[source]
List of all Flows in a given NiFi Registry Bucket
- Parameters:
bucket_id (str) – The UUID of the Bucket to fetch from
- Returns:
(list[VersionedFlow]) objects
- nipyapi.versioning.list_registry_buckets()[source]
Lists all available Buckets in the NiFi Registry
- Returns:
(list[Bucket]) objects
- nipyapi.versioning.list_registry_clients()[source]
Lists the available Registry Clients in the NiFi Controller Services
- Returns:
(list[FlowRegistryClientEntity]) objects
- nipyapi.versioning.save_flow_ver(process_group, registry_client, bucket, flow_name=None, flow_id=None, comment='', desc='', refresh=True, force=False)[source]
Adds a Process Group into NiFi Registry Version Control, or saves a new version to an existing VersionedFlow with a new version
- Parameters:
process_group (ProcessGroupEntity) – the ProcessGroup object to save as a new Flow Version
registry_client (RegistryClient) – The Client linked to the Registry which contains the Bucket to save to
bucket (Bucket) – the Bucket on the NiFi Registry to save to
flow_name (str) – A name for the VersionedFlow in the Bucket Note you need either a name for a new VersionedFlow, or the ID of an existing one to save a new version
flow_id (Optional [str]) – Identifier of an existing VersionedFlow in the bucket, if saving a new version to an existing flow
comment (str) – A comment for the version commit
desc (str) – A description of the VersionedFlow
refresh (bool) – Whether to refresh the object revisions before action
force (bool) – Whether to Force Commit, or just regular Commit
- Returns:
(VersionControlInformationEntity)
- nipyapi.versioning.update_flow_ver(process_group, target_version=None)[source]
Changes a versioned flow to the specified version, or the latest version
- Parameters:
process_group (ProcessGroupEntity) – ProcessGroupEntity under version control to change
target_version (Optional [None, Int]) – Either None to move to the
version (latest available) –
to (or Int of the version number to move) –
- Returns:
True if successful, False if not
- Return type:
(bool)
Swagger Client SDKs
These sub-packages are full swagger clients to the NiFi and NiFi-Registry APIs and may be used directly, or wrapped into the NiPyApi SDK convenience functions