Demos

These modules leverage functionality within the rest of the Package to demonstrate various capabilities

Client SDK modules

These wrapper modules contain collections of convenience functions for daily operations of your NiFi and NiFi-Registry environment. They wrap and surface underlying data structures and calls to the full SDK swagger clients which are also included in the package.

Canvas

For interactions with the NiFi Canvas.

nipyapi.canvas.get_root_pg_id()[source]

Convenience function to return the UUID of the Root Process Group

Returns (str): The UUID of the root PG

nipyapi.canvas.recurse_flow(pg_id='root')[source]

Returns information about a Process Group and all its Child Flows. Recurses the child flows by appending each process group with a ‘nipyapi_extended’ parameter which contains the child process groups, etc. Note: This previously used actual recursion which broke on large NiFi

environments, we now use a task/list update approach
Parameters:pg_id (str) – The Process Group UUID
Returns:enriched NiFi Flow object
Return type:(ProcessGroupFlowEntity)
nipyapi.canvas.get_flow(pg_id='root')[source]

Returns information about a Process Group and flow.

This surfaces the native implementation, for the recursed implementation see ‘recurse_flow’

Parameters:pg_id (str) – id of the Process Group to retrieve, defaults to the root process group if not set
Returns:The Process Group object
Return type:(ProcessGroupFlowEntity)
nipyapi.canvas.get_process_group_status(pg_id='root', detail='names')[source]

Returns an entity containing the status of the Process Group. Optionally may be configured to return a simple dict of name:id pairings

Note that there is also a ‘process group status’ command available, but it returns a subset of this data anyway, and this call is more useful

Parameters:
  • pg_id (str) – The UUID of the Process Group
  • detail (str) – ‘names’ or ‘all’; whether to return a simple dict of name:id pairings, or the full details. Defaults to ‘names’
Returns:

The Process Group Entity including the status

Return type:

(ProcessGroupEntity)

nipyapi.canvas.get_process_group(identifier, identifier_type='name', greedy=True)[source]

Filters the list of all process groups against a given identifier string occurring in a given identifier_type field.

Parameters:
  • identifier (str) – the string to filter the list for
  • identifier_type (str) – the field to filter on, set in config.py
  • greedy (bool) – True for partial match, False for exact match
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.canvas.list_all_process_groups(pg_id='root')[source]

Returns a flattened list of all Process Groups on the canvas. Potentially slow if you have a large canvas.

Note that the ProcessGroupsApi().get_process_groups(pg_id) command only provides the first layer of pgs, whereas this trawls the entire canvas

Parameters:pg_id (str) – The UUID of the Process Group to start from, defaults to the Canvas root
Returns:list[ProcessGroupEntity]
nipyapi.canvas.delete_process_group(process_group, force=False, refresh=True)[source]

Deletes a given Process Group, with optional prejudice.

Parameters:
  • process_group (ProcessGroupEntity) – The target Process Group
  • force (bool) – Stop, purge and clean the target Process Group before deletion. Experimental.
  • refresh (bool) – Whether to refresh the state first
Returns:

The updated object state

Return type:

(ProcessGroupEntity

nipyapi.canvas.schedule_process_group(process_group_id, scheduled)[source]

Start or Stop a Process Group and all components.

Note that this doesn’t guarantee that all components have started, as some may be in Invalid states.

Parameters:
  • process_group_id (str) – The UUID of the target Process Group
  • scheduled (bool) – True to start, False to stop
Returns:

True of successfully scheduled, False if not

Return type:

(bool)

nipyapi.canvas.create_process_group(parent_pg, new_pg_name, location, comment='')[source]

Creates a new Process Group with the given name under the provided parent Process Group at the given Location

Parameters:
  • parent_pg (ProcessGroupEntity) – The parent Process Group to create the new process group in
  • new_pg_name (str) – The name of the new Process Group
  • location (tuple[x, y]) – the x,y coordinates to place the new Process Group under the parent
  • comment (str) – Entry for the Comments field
Returns:

The new Process Group

Return type:

(ProcessGroupEntity)

nipyapi.canvas.list_all_processors(pg_id='root')[source]

Returns a flat list of all Processors under the provided Process Group

Parameters:pg_id (str) – The UUID of the Process Group to start from, defaults to the Canvas root
Returns:list[ProcessorEntity]
nipyapi.canvas.list_all_processor_types()[source]

Produces the list of all available processor types in the NiFi instance

Returns:A native datatype containing the processors list
Return type:list(ProcessorTypesEntity)
nipyapi.canvas.get_processor_type(identifier, identifier_type='name')[source]

Gets the abstract object describing a Processor, or list thereof

Parameters:
  • identifier (str) – the string to filter the list for
  • identifier_type (str) – the field to filter on, set in config.py
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.canvas.create_processor(parent_pg, processor, location, name=None, config=None)[source]

Instantiates a given processor on the canvas

Parameters:
  • parent_pg (ProcessGroupEntity) – The parent Process Group
  • processor (DocumentedTypeDTO) – The abstract processor type object to be instantiated
  • location (tuple[x, y]) – The location coordinates
  • name (Optional [str]) – The name for the new Processor
  • config (Optional [ProcessorConfigDTO]) – A configuration object for the new processor
Returns:

The new Processor

Return type:

(ProcessorEntity)

nipyapi.canvas.delete_processor(processor, refresh=True, force=False)[source]

Deletes a Processor from the canvas, with optional prejudice.

Parameters:
  • processor (ProcessorEntity) – The processor to delete
  • refresh (bool) – Whether to refresh the Processor state before action
  • force (bool) – Whether to stop, purge and remove connections to the Processor before deletion. Behavior may change in future releases.
Returns:

The updated ProcessorEntity

Return type:

(ProcessorEntity)

nipyapi.canvas.get_processor(identifier, identifier_type='name', greedy=True)[source]

Filters the list of all Processors against the given identifier string in the given identifier_type field

Parameters:
  • identifier (str) – The String to filter against
  • identifier_type (str) – The field to apply the filter to. Set in config.py
  • greedy (bool) – Whether to exact match (False) or partial match (True)
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.canvas.schedule_processor(processor, scheduled, refresh=True)[source]

Set a Processor to Start or Stop.

Note that this doesn’t guarantee that it will change state, merely that it will be instructed to try. Some effort is made to wait and see if the processor starts

Parameters:
  • processor (ProcessorEntity) – The Processor to target
  • scheduled (bool) – True to start, False to stop
  • refresh (bool) – Whether to refresh the object before action
Returns:

True for success, False for failure

Return type:

(bool)

nipyapi.canvas.get_funnel(funnel_id)[source]

Gets a given Funnel by ID

nipyapi.canvas.update_processor(processor, update)[source]

Updates configuration parameters for a given Processor.

An example update would be: nifi.ProcessorConfigDTO(scheduling_period=‘3s’)

Parameters:
Returns:

The updated ProcessorEntity

Return type:

(ProcessorEntity)

nipyapi.canvas.get_variable_registry(process_group, ancestors=True)[source]

Gets the contents of the variable registry attached to a Process Group

Parameters:
  • process_group (ProcessGroupEntity) – The Process Group to retrieve the Variable Registry from
  • ancestors (bool) – Whether to include the Variable Registries from child Process Groups
Returns:

The Variable Registry

Return type:

(VariableRegistryEntity)

nipyapi.canvas.update_variable_registry(process_group, update, refresh=True)[source]

Updates one or more key:value pairs in the variable registry

Parameters:
  • process_group (ProcessGroupEntity) – The Process Group which has the
  • Registry to be updated (Variable) –
  • update (list[tuple]) – The variables to write to the registry
  • refresh (bool) – Whether to refresh the object revision before updating
Returns:

The created or updated Variable Registry Entries

Return type:

(VariableRegistryEntity)

nipyapi.canvas.purge_connection(con_id)[source]

EXPERIMENTAL Drops all FlowFiles in a given connection. Waits until the action is complete before returning.

Note that if upstream component isn’t stopped, more data may flow into the connection after this action.

Parameters:con_id (str) – The UUID of the Connection to be purged
Returns:The status reporting object for the drop request.
Return type:(DropRequestEntity)
nipyapi.canvas.purge_process_group(process_group, stop=False)[source]

EXPERIMENTAL Purges the connections in a given Process Group of FlowFiles, and optionally stops it first

Parameters:
  • process_group (ProcessGroupEntity) – Target Process Group
  • stop (Optional [bool]) – Whether to stop the Process Group before action
Returns:

True|False}]): Result set. A list of Dicts of

Return type:

(list[dict{ID

Connection IDs mapped to True or False for success of each connection

nipyapi.canvas.schedule_components(pg_id, scheduled, components=None)[source]

Changes the scheduled target state of a list of components within a given Process Group.

Note that this does not guarantee that components will be Started or Stopped afterwards, merely that they will have their scheduling updated.

Parameters:
  • pg_id (str) – The UUID of the parent Process Group
  • scheduled (bool) – True to start, False to stop
  • components (list[ComponentType]) – The list of Component Entities to schdule, e.g. ProcessorEntity’s
Returns:

True for success, False for not

Return type:

(bool)

nipyapi.canvas.get_bulletins()[source]

Retrieves current bulletins (alerts) from the Flow Canvas

Returns:The native datatype containing a list
Return type:(ControllerBulletinsEntity)

of bulletins

nipyapi.canvas.get_bulletin_board()[source]

Retrieves the bulletin board object

Returns:The native datatype BulletinBoard object
Return type:(BulletinBoardEntity)
nipyapi.canvas.list_invalid_processors(pg_id='root', summary=False)[source]

Returns a flattened list of all Processors with Invalid Statuses

Parameters:
  • pg_id (str) – The UUID of the Process Group to start from, defaults to the Canvas root
  • summary (bool) – True to return just the list of relevant properties per Processor, False for the full listing
Returns:

list[ProcessorEntity]

nipyapi.canvas.list_sensitive_processors(pg_id='root', summary=False)[source]

Returns a flattened list of all Processors on the canvas which have sensitive properties that would need to be managed during deployment

Parameters:
  • pg_id (str) – The UUID of the Process Group to start from, defaults to the Canvas root
  • summary (bool) – True to return just the list of relevant properties per Processor, False for the full listing
Returns:

list[ProcessorEntity] or list(dict)

nipyapi.canvas.list_all_connections(pg_id='root', descendants=True)[source]

Lists all connections for a given Process Group ID

Parameters:
  • pg_id (str) – ID of the Process Group to retrieve Connections from
  • descendants (bool) – True to recurse child PGs, False to not
Returns:

List of ConnectionEntity objects

Return type:

(list)

nipyapi.canvas.create_connection(source, target, relationships=None, name=None)[source]

Creates a connection between two objects for the given relationships

Parameters:
  • source – Object to initiate the connection, e.g. ProcessorEntity
  • target – Object to terminate the connection, e.g. FunnelEntity
  • relationships (list) – list of strings of relationships to connect, may be collected from the object ‘relationships’ property (optional)
  • name (str) – Defaults to None, String of Name for Connection (optional)
Returns:

for the created connection

Return type:

(ConnectionEntity)

nipyapi.canvas.delete_connection(connection, purge=False)[source]

Deletes a connection, optionally purges it first

Parameters:
  • connection (ConnectionEntity) – Connection to delete
  • purge (bool) – True to Purge, Defaults to False
Returns:

the modified Connection

Return type:

(ConnectionEntity)

nipyapi.canvas.get_component_connections(component)[source]

Returns list of Connections related to a given Component, e.g. Processor

Parameters:component – Component Object to filter by, e.g. a ProcessorEntity
Returns:List of ConnectionEntity Objects
Return type:(list)
nipyapi.canvas.create_controller(parent_pg, controller, name=None)[source]
Creates a new Controller Service in a given Process Group of the given
Controller type, with the given Name
Parameters:
  • parent_pg (ProcessGroupEntity) – Target Parent PG
  • controller (DocumentedTypeDTO) – Type of Controller to create, found via the list_all_controller_types method
  • name (str[Optional]) – Name for the new Controller as a String
Returns:

(ControllerServiceEntity)

nipyapi.canvas.list_all_controllers(pg_id='root', descendants=True)[source]
Lists all controllers under a given Process Group, defaults to Root
Optionally recurses all child Process Groups as well
Parameters:
  • pg_id (str) – String of the ID of the Process Group to list from
  • descendants (bool) – True to recurse child PGs, False to not
Returns:

None, ControllerServiceEntity, or list(ControllerServiceEntity)

nipyapi.canvas.delete_controller(controller, force=False)[source]

Delete a Controller service, with optional prejudice

Parameters:
  • controller (ControllerServiceEntity) – Target Controller to delete
  • force (bool) – True to attempt Disable the Controller before deletion
Returns:

(ControllerServiceEntity)

nipyapi.canvas.update_controller(controller, update, refresh=True)[source]

Updates the Configuration of a Controller Service

Parameters:
  • controller (ControllerServiceEntity) – Target Controller to update
  • update (ControllerServiceDTO) – Controller Service configuration object containing the new config params and properties
  • refresh (bool) – True to refresh before applying
Returns:

(ControllerServiceEntity)

nipyapi.canvas.schedule_controller(controller, scheduled, refresh=False)[source]

Start/Enable or Stop/Disable a Controller Service

Parameters:
  • controller (ControllerServiceEntity) – Target Controller to schedule
  • scheduled (bool) – True to start, False to stop
  • refresh (bool) – Whether to refresh the component revision before execution
Returns:

(ControllerServiceEntity)

nipyapi.canvas.get_controller(identifier, identifier_type='name', bool_response=False, greedy=True)[source]

Retrieve a given Controller

Parameters:
  • identifier (str) – ID or Name of a Controller to find
  • identifier_type (str) – ‘id’ or ‘name’, defaults to name
  • bool_response (bool) – If True, will return False if the Controller is not found - useful when testing for deletion completion
  • greedy (bool) – True for partial match, False for exact match

Returns:

nipyapi.canvas.list_all_controller_types()[source]

Lists all Controller Service types available on the environment

Returns:list(DocumentedTypeDTO)
nipyapi.canvas.list_all_by_kind(kind, pg_id='root', descendants=True)[source]

Retrieves a list of all instances of a supported object type

Parameters:
  • kind (str) – one of input_ports, output_ports, funnels, controllers, connections, remote_process_groups
  • pg_id (str) – optional, ID of the Process Group to use as search base
  • descendants (bool) – optional, whether to collect child group info
Returns:

list of the Entity type of the kind, or single instance, or None

nipyapi.canvas.list_all_input_ports(pg_id='root', descendants=True)[source]

Convenience wrapper for list_all_by_kind for input ports

nipyapi.canvas.list_all_output_ports(pg_id='root', descendants=True)[source]

Convenience wrapper for list_all_by_kind for output ports

nipyapi.canvas.list_all_funnels(pg_id='root', descendants=True)[source]

Convenience wrapper for list_all_by_kind for funnels

nipyapi.canvas.list_all_remote_process_groups(pg_id='root', descendants=True)[source]

Convenience wrapper for list_all_by_kind for remote process groups

nipyapi.canvas.delete_funnel(funnel, refresh=True)[source]

Deletes a Funnel Object

Parameters:
  • funnel (FunnelEntity) – The Funnel to delete
  • refresh (bool) – Whether to refresh the object state before execution
Returns:

(FunnelEntity) Deleted FunnelEntity reference

nipyapi.canvas.get_remote_process_group(rpg_id, summary=False)[source]

Fetch a remote process group object, with optional summary of just ports

nipyapi.canvas.update_process_group(pg, update)[source]

Updates a given Process Group.

Parameters:
  • pg (ProcessGroupEntity) – The Processor to target for update
  • update (dict) – key:value pairs to update
Returns:

The updated ProcessorEntity

Return type:

(ProcessGroupEntity)

nipyapi.canvas.create_funnel(pg_id, position=None)[source]

Creates a Funnel Object

Parameters:
  • pg_id (str) – ID of the parent Process Group
  • position (tuple[int, int]) – Position on canvas
Returns:

(FunnelEntity) Created Funnel

nipyapi.canvas.create_remote_process_group(target_uris, transport='RAW', pg_id='root', position=None)[source]

Creates a new Remote Process Group with given parameters

Parameters:
  • target_uris (str) – Comma separated list of target URIs
  • transport (str) – optional, RAW or HTTP
  • pg_id (str) – optional, UUID of parent Process Group for remote process group
  • position (tuple) – optional, tuple of location ints
Returns:

(RemoteProcessGroupEntity)

nipyapi.canvas.delete_remote_process_group(rpg, refresh=True)[source]

Deletes a given remote process group

Parameters:
  • rpg (RemoteProcessGroupEntity) – Remote Process Group to remove
  • refresh (bool) – Whether to refresh the object before action
Returns:

(RemoteProcessGroupEntity)

nipyapi.canvas.set_remote_process_group_transmission(rpg, enable=True, refresh=True)[source]

Enable or Disable Transmission for an RPG

Parameters:
  • rpg (RemoteProcessGroupEntity) – The ID of the remote process group to modify
  • enable (bool) – True to enable, False to disable
  • refresh (bool) – Whether to refresh the object before action

Returns:

nipyapi.canvas.get_pg_parents_ids(pg_id)[source]

Retrieve the ids of the parent Process Groups.

Parameters:pg_id (str) – Process group id
Returns:(list) List of ids of the input PG parents

Config

A set of defaults and parameters used elsewhere in the project. Also provides a handy link to the low-level client SDK configuration singleton objects.

Security

Secure connectivity management for NiPyApi

nipyapi.security.create_service_user(identity, service='nifi', strict=True)[source]

Attempts to create a user with the provided identity in the given service

Parameters:
  • identity (str) – Identity string for the user
  • service (str) – ‘nifi’ or ‘registry’
  • strict (bool) – If Strict, will error if user already exists
Returns:

The new (User) or (UserEntity) object

nipyapi.security.create_service_user_group(identity, service='nifi', users=None, strict=True)[source]

Attempts to create a user with the provided identity and member users in the given service

Parameters:
  • identity (str) – Identity string for the user group
  • service (str) – ‘nifi’ or ‘registry’
  • users (list) – A list of nifi.UserEntity or registry.User belonging to the group
  • strict (bool) – Whether to throw an error on already exists
Returns:

The new (UserGroup) or (UserGroupEntity) object

nipyapi.security.set_service_auth_token(token=None, token_name='tokenAuth', service='nifi')[source]

Helper method to set the auth token correctly for the specified service

Parameters:
  • token (Optional[str]) – The token to set. Defaults to None.
  • token_name (str) – the api_key field name to set the token to. Defaults to ‘tokenAuth’
  • service (str) – ‘nifi’ or ‘registry’, the service to set
Returns:

True on success, False if token not set

Return type:

(bool)

nipyapi.security.service_logout(service='nifi')[source]

Logs out from the service by resetting the token :param service: ‘nifi’ or ‘registry’; the target service :type service: str

Returns:True of access removed, False if still set
Return type:(bool)
nipyapi.security.get_service_access_status(service='nifi', bool_response=False)[source]

Gets the access status for the current session

Parameters:
  • service (str) – A String of ‘nifi’ or ‘registry’ to indicate which service to check status for
  • bool_response (bool) – If True, the function will return False on hitting an Error instead of raising it. Useful for connection testing.
Returns:

(bool) if bool_response, else the Service Access Status of the User

nipyapi.security.add_user_to_access_policy(user, policy, service='nifi', refresh=True, strict=True)[source]

Attempts to add the given user object to the given access policy

Parameters:
  • user (User) or (UserEntity) – User object to add
  • policy (AccessPolicyEntity) or (AccessPolicy) – Access Policy object
  • service (str) – ‘nifi’ or ‘registry’ to identify the target service
  • refresh (bool) – Whether to refresh the policy object before submit
  • strict (bool) – If True, will return error if user already present, if False will ignore the already exists
Returns:

Updated Policy object

nipyapi.security.update_access_policy(policy, service='nifi')[source]

Applies an updated access policy to the service indicated

Parameters:
  • policy (PolicyEntity) – The policy object to submit
  • service (str) – ‘nifi’ or ‘registry’ to indicate the target service
Returns:

The updated policy if successful

Return type:

(PolicyEntity)

nipyapi.security.get_access_policy_for_resource(resource, action, r_id=None, service='nifi', auto_create=False)[source]

Attempts to retrieve the access policy for a given resource and action, and optionally resource_id if targeting NiFi. Optionally creates the policy if it doesn’t already exist

Parameters:
  • resource (str) – A valid resource in the target service
  • action (str) – A valid action, typically ‘read’, ‘write’ or ‘delete’
  • r_id (Optional[str]) – The UUID of the resource, valid only if targeting NiFi resources
  • service (str) – Which service to target, typically ‘nifi’ or ‘registry’
  • auto_create (bool) – Whether to create the targeted policy if it doesn’t already exist
Returns:

The relevant AccessPolicy object

nipyapi.security.create_access_policy(resource, action, r_id=None, service='nifi')[source]

Creates an access policy for the given resource, action and optionally resource_id for NiFi.

Parameters:
  • resource (str) – a valid resource type for this service, e.g. ‘bucket’
  • action (str) – a valid action type for this service, typically ‘read’, ‘write’ or ‘delete’
  • r_id (optional[str]) – if NiFi, the resource ID of the resource
  • service (str) – the service to target
Returns:

An access policy object for that service

nipyapi.security.list_service_users(service='nifi')[source]

Lists all users of a given service, takes a service name as a string

nipyapi.security.get_service_user(identifier, identifier_type='identity', service='nifi')[source]

Gets the unique user matching to the given identifier and type.

Parameters:
  • identifier (str) – the string to search for
  • identifier_type (str) – the field to search in
  • service (str) – the name of the service
Returns:

None if no match, else single object

nipyapi.security.set_service_ssl_context(service='nifi', ca_file=None, client_cert_file=None, client_key_file=None, client_key_password=None)[source]

Create an SSLContext for connecting over https to a secured NiFi or NiFi-Registry instance.

This method can be used to create an SSLContext for two-way TLS in which a client cert is used by the service to authenticate the client.

This method can also be used for one-way TLS in which the client verifies the server’s certificate, but authenticates using a different form of credentials, such as LDAP username/password.

If you are using one-way TLS with a certificate signed by a root CA trusted by your system/platform, this step is not necessary as the default TLS-handshake should “just work.”

Parameters:
  • service (str) – ‘nifi’ or ‘registry’ to indicate which service config to set the ssl context to
  • ca_file (str) – A PEM file containing certs for the root CA(s) for the NiFi Registry server
  • client_cert_file (str) – A PEM file containing the public certificates for the user/client identity
  • client_key_file (str) – An encrypted (password-protected) PEM file containing the client’s secret key
  • client_key_password (str) – The password to decrypt the client_key_file
Returns:

(None)

nipyapi.security.add_user_group_to_access_policy(user_group, policy, service='nifi', refresh=True)[source]

Attempts to add the given user group object to the given access policy

Parameters:
  • user_group (UserGroup) or (UserGroupEntity) – User group object to add
  • policy (AccessPolicyEntity) or (AccessPolicy) – Access Policy object
  • service (str) – ‘nifi’ or ‘registry’ to identify the target service
  • refresh (bool) – Whether to refresh the policy object before submission
Returns:

Updated Policy object

nipyapi.security.bootstrap_security_policies(service, user_identity=None, group_identity=None)[source]

Creates a default security context within NiFi or Nifi-Registry

Parameters:
  • service (str) – ‘nifi’ or ‘registry’ to indicate which service
  • user_identity – a service user to establish in the security context
  • group_identity – a service group to establish in the security context
Returns:

None

nipyapi.security.service_login(service='nifi', username=None, password=None, bool_response=False, auth_type='token')[source]

Login to the currently configured NiFi or NiFi-Registry server.

Login requires a secure connection over https. Prior to calling this method, the host must be specified and the SSLContext should be configured (if necessary).

Successful login will result in a generated token (JWT) being cached in the api_client config that will be passed in all future REST API calls. To clear that token, call service_logout.

The token is temporary and will expire after a duration set by the server. After a token expires, you must call this method again to generate a new token.

Parameters:
  • service (str) – ‘nifi’ or ‘registry’; the service to login to
  • username (str) – The username to submit
  • password (str) – The password to use
  • bool_response (bool) – If True, the function will return False instead of an error. Useful for connection testing.
  • auth_type (str) – token (default) or basic
Returns:

True if successful, False or an Error if not. See bool_response

Return type:

(bool)

nipyapi.security.remove_service_user(user, service='nifi', strict=True)[source]

Removes a given User from the given Service

Parameters:
  • user – [(nifi.UserEntity), (registry.User)] Target User object
  • service (str) – ‘nifi’ or ‘registry’
  • strict (bool) – Whether to throw an error if User not found
Returns:

Updated User Entity or None

nipyapi.security.list_service_user_groups(service='nifi')[source]

Returns list of service user groups for a given service :param service: ‘nifi’ or ‘registry’ :type service: str

Returns:[(nifi.UserGroupEntity, registry.UserGroup)]
nipyapi.security.get_service_user_group(identifier, identifier_type='identity', service='nifi')[source]

Gets the unique group matching to the given identifier and type.

Parameters:
  • identifier (str) – the string to search for
  • identifier_type (str) – the field to search in, identity or id
  • service (str) – the name of the service
Returns:

None if no match, else single object

nipyapi.security.remove_service_user_group(group, service='nifi', strict=True)[source]

Removes a given User Group from the given Service

Parameters:
  • group – [(nifi.UserEntity), (registry.User)] Target User object
  • service (str) – ‘nifi’ or ‘registry’
  • strict (bool) – Whether to throw an error if User not found
Returns:

Updated User Group or None

System

For system and cluster level functions interacting with the underlying NiFi Services

nipyapi.system.get_system_diagnostics()[source]

Returns NiFi Sytems diagnostics page

Returns (json):

nipyapi.system.get_cluster()[source]

EXPERIMENTAL Returns the contents of the NiFi cluster

Returns (json):

nipyapi.system.get_node(nid)[source]

Returns the cluster node information

Parameters:nid (str) – The UUID of the Node to target

Returns:

nipyapi.system.get_nifi_version_info()[source]

Returns the version information of the connected NiFi instance

Returns (VersionInfoDTO):

Templates

For managing flow deployments

nipyapi.templates.list_all_templates(native=True)[source]

Gets a list of all templates on the canvas

Returns:A list of TemplateEntity’s
Return type:(list[TemplateEntity])
nipyapi.templates.get_template_by_name(name)[source]

DEPRECATED Returns a specific template by name, if it exists.

Note: This function is replaced by get_template

Parameters:name (str) – The Name of the template, exact match required
Returns:(TemplateEntity)
nipyapi.templates.deploy_template(pg_id, template_id, loc_x=0.0, loc_y=0.0)[source]

Instantiates a given template request in a given process group

Parameters:
  • pg_id (str) – The UUID of the Process Group to deploy into
  • template_id (str) – The UUID of the Template to deploy. Note that the Template must already be uploaded and available to the target Process Group
  • loc_x (float) – The X coordinate to deploy the Template at. Default(0.0)
  • loc_y (float) – The X coordinate to deploy the Template at. Default(0.0)
Returns:

The FlowEntity of the Process Group with the deployed

template

Return type:

(FlowEntity)

nipyapi.templates.upload_template(pg_id, template_file)[source]

Uploads a given template xml from from the file system to the given Process Group

Parameters:
  • pg_id (str) – The UUID of the Process Group to upload to
  • template_file (str) – The path including filename to the template file
Returns:

The new Template object

Return type:

(TemplateEntity)

nipyapi.templates.create_pg_snippet(pg_id)[source]

Creates a snippet of the targeted process group, and returns the object ready to be turned into a Template

Parameters:pg_id – UUID of the process Group to snippet
Returns:The Snippet Object
Return type:(SnippetEntity)
nipyapi.templates.create_template(pg_id, name, desc='')[source]

Creates a Template from a Process Group

Parameters:
  • pg_id (str) – The UUID of the target Process Group
  • name (str) – The name for the new Template. Must be unique
  • desc (optional[str]) – The description for the new Template
Returns:

The newly created Template

Return type:

(TemplateEntity)

nipyapi.templates.delete_template(t_id)[source]

Deletes a Template

Parameters:t_id (str) – UUID of the Template to be deleted
Returns:The updated Template object
nipyapi.templates.export_template(t_id, output='string', file_path=None)[source]

Exports a given Template as either a string or a file.

Note that to reimport the Template it must be a file

Parameters:
  • t_id (str) – The UUID of the Template to export
  • output (str) – ‘string’ or ‘file’ to set the export action
  • file_path (Optional [str]) – The full path including filename to write the Template export to
Returns:

A String representation of the exported Template XML. Note

that this may not be utf-8 encoded.

Return type:

(str)

nipyapi.templates.get_template(identifier, identifier_type='name', greedy=False)[source]

Filters the list of all Templates for a given string in a given field. Note that filters are configured in config.py

Parameters:
  • identifier (str) – The string to filter on
  • identifier_type (str) – The identifier of the field to filter on
  • greedy (bool) – True for greedy match, False for exact match
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

Utils

Convenience utility functions for NiPyApi, not really intended for external use

nipyapi.utils.dump(obj, mode='json')[source]

Dumps a native datatype object to json or yaml, defaults to json

Parameters:
  • obj (varies) – The native datatype object to serialise
  • mode (str) – ‘json’ or ‘yaml’, the supported export modes

Returns (str): The serialised object

nipyapi.utils.load(obj, dto=None)[source]

Loads a serialised object back into native datatypes, and optionally imports it back into the native NiFi DTO

Warning: Using this on objects not produced by this Package may have unintended results! While efforts have been made to ensure that unsafe loading is not possible, no stringent security testing has been completed.

Parameters:
  • obj (dict, list) – The serialised object to import
  • dto (Optional [tuple{str, str}]) – A Tuple describing the service and
  • that should be constructed. (object) –
  • dto = (e.g.) –
Returns: Either the loaded object in native Python datatypes, or the
constructed native datatype object
nipyapi.utils.fs_read(file_path)[source]

Convenience function to read an Object from a FilePath

Parameters:file_path (str) – The Full path including filename to read from

Returns: The object that was read

nipyapi.utils.fs_write(obj, file_path)[source]

Convenience function to write an Object to a FilePath

Parameters:
  • obj (varies) – The Object to write out
  • file_path (str) – The Full path including filename to write to

Returns: The object that was written

nipyapi.utils.filter_obj(obj, value, key, greedy=True)[source]

Implements a custom filter method because native datatypes don’t have consistently named or located fields.

Note that each object used by this function must be registered with identifier_types and identifiers in config

Parameters:
  • obj (varies) – the NiFi or NiFi-Registry object to filter on
  • value (str) – the String value to look for
  • key (str) – the object key to filter against
  • greedy (bool) – If True, the value will be matched anywhere in the string, if False it will require exact match

Returns: None if 0 matches, list if > 1, single Object entity if ==1

nipyapi.utils.wait_to_complete(test_function, *args, **kwargs)[source]

Implements a basic return loop for a given function which is capable of a True|False output

Parameters:
  • test_function – Function which returns a bool once the target state is reached
  • delay (int) – The number of seconds between each attempt, defaults to config.short_retry_delay
  • max_wait (int) – the maximum number of seconds before issuing a Timeout, defaults to config.short_max_wait
  • *args – Any args to pass through to the test function
  • **kwargs – Any Keword Args to pass through to the test function

Returns (bool): True for success, False for not

nipyapi.utils.is_endpoint_up(endpoint_url)[source]

Tests if a URL is available for requests

Parameters:endpoint_url (str) – The URL to test

Returns (bool): True for a 200 response, False for not

nipyapi.utils.set_endpoint(endpoint_url, ssl=False, login=False)[source]

EXPERIMENTAL

Sets the endpoint when switching between instances of NiFi or other projects. Not tested extensively with secured instances.

Parameters:
  • endpoint_url (str) – The URL to set as the endpoint. Auto-detects the relevant service e.g. ‘http://localhost:18080/nifi-registry-api
  • ssl (bool) – Whether to use the default security context in nipyapi.config to authenticate if a secure URL is detected
  • login (bool) – Whether to attempt login using default cred in config requires ssl to be set

Returns (bool): True for success, False for not

nipyapi.utils.start_docker_containers(docker_containers, network_name='demo')[source]

Deploys a list of DockerContainer’s on a given network

Parameters:
  • docker_containers (list[DockerContainer]) – list of Dockers to start
  • network_name (str) – The name of the Docker Bridge Network to get or create for the Docker Containers

Returns: Nothing

class nipyapi.utils.DockerContainer(name=None, image_name=None, image_tag=None, ports=None, env=None, volumes=None, test_url=None, endpoint=None)[source]

Bases: object

Helper class for Docker container automation without using Ansible

get_container()[source]
get_test_url_status()[source]

Checks if a URL is available :return: status code if available, String ‘ConnectionError’ if not

set_container(container)[source]
nipyapi.utils.infer_object_label_from_class(obj)[source]
Returns the expected STRING label for an object class required by certain
functions.
Parameters:obj – The object to infer the name of
Returns:str of the relevant name, or raises an AssertionError
nipyapi.utils.bypass_slash_encoding(service, bypass)[source]

Instructs the API Client to bypass encoding the ‘/’ character

Parameters:
  • service (str) – ‘nifi’ or ‘registry’
  • bypass (bool) – True will not encode ‘/’ in fields via API calls
Returns:

None

Versioning

For interactions with the NiFi Registry Service and related functions

nipyapi.versioning.create_registry_client(name, uri, description)[source]

Creates a Registry Client in the NiFi Controller Services

Parameters:
  • name (str) – The name of the new Client
  • uri (str) – The URI for the connection, such as ‘http://registry:18080
  • description (str) – A description for the Client
Returns:

The new registry client object

Return type:

(RegistryClientEntity)

nipyapi.versioning.list_registry_clients()[source]

Lists the available Registry Clients in the NiFi Controller Services

Returns:(list[RegistryClientEntity]) objects
nipyapi.versioning.delete_registry_client(client, refresh=True)[source]

Deletes a Registry Client from the list of NiFI Controller Services

Parameters:
  • client (RegistryClientEntity) – The client to delete
  • refresh (bool) – Whether to refresh the object before action
Returns:

The updated client object

Return type:

(RegistryClientEntity)

nipyapi.versioning.get_registry_client(identifier, identifier_type='name')[source]

Filters the Registry clients to a particular identifier

Parameters:
  • identifier (str) – the filter string
  • identifier_type (str) – the parameter to filter on
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.versioning.list_registry_buckets()[source]

Lists all available Buckets in the NiFi Registry

Returns:(list[Bucket]) objects
nipyapi.versioning.create_registry_bucket(name)[source]

Creates a new Registry Bucket

Parameters:name (str) – name for the bucket, must be unique in the Registry
Returns:The new Bucket object
Return type:(Bucket)
nipyapi.versioning.delete_registry_bucket(bucket)[source]

Removes a bucket from the NiFi Registry

Parameters:bucket (Bucket) – the Bucket object to remove
Returns:The updated Bucket object
Return type:(Bucket)
nipyapi.versioning.get_registry_bucket(identifier, identifier_type='name')[source]

Filters the Bucket list to a particular identifier

Parameters:
  • identifier (str) – the filter string
  • identifier_type (str) – the param to filter on
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.versioning.save_flow_ver(process_group, registry_client, bucket, flow_name=None, flow_id=None, comment='', desc='', refresh=True, force=False)[source]

Adds a Process Group into NiFi Registry Version Control, or saves a new version to an existing VersionedFlow with a new version

Parameters:
  • process_group (ProcessGroupEntity) – the ProcessGroup object to save as a new Flow Version
  • registry_client (RegistryClient) – The Client linked to the Registry which contains the Bucket to save to
  • bucket (Bucket) – the Bucket on the NiFi Registry to save to
  • flow_name (str) – A name for the VersionedFlow in the Bucket Note you need either a name for a new VersionedFlow, or the ID of an existing one to save a new version
  • flow_id (Optional [str]) – Identifier of an existing VersionedFlow in the bucket, if saving a new version to an existing flow
  • comment (str) – A comment for the version commit
  • desc (str) – A description of the VersionedFlow
  • refresh (bool) – Whether to refresh the object revisions before action
  • force (bool) – Whether to Force Commit, or just regular Commit
Returns:

(VersionControlInformationEntity)

nipyapi.versioning.list_flows_in_bucket(bucket_id)[source]

List of all Flows in a given NiFi Registry Bucket

Parameters:bucket_id (str) – The UUID of the Bucket to fetch from
Returns:(list[VersionedFlow]) objects
nipyapi.versioning.get_flow_in_bucket(bucket_id, identifier, identifier_type='name')[source]

Filters the Flows in a Bucket against a particular identifier

Parameters:
  • bucket_id (str) – UUID of the Bucket to filter against
  • identifier (str) – The string to filter on
  • identifier_type (str) – The param to check
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.versioning.get_latest_flow_ver(bucket_id, flow_id)[source]

Gets the most recent version of a VersionedFlowSnapshot from a bucket

Parameters:
  • bucket_id (str) – the UUID of the Bucket containing the flow
  • flow_id (str) – the UUID of the VersionedFlow to be retrieved
Returns:

(VersionedFlowSnapshot)

nipyapi.versioning.update_flow_ver(process_group, target_version=None)[source]

Changes a versioned flow to the specified version, or the latest version

Parameters:
  • process_group (ProcessGroupEntity) – ProcessGroupEntity under version control to change
  • target_version (Optional [None, Int]) – Either None to move to the
  • available version, or Int of the version number to move to (latest) –
Returns:

True if successful, False if not

Return type:

(bool)

nipyapi.versioning.get_version_info(process_group)[source]

Gets the Version Control information for a particular Process Group

Parameters:process_group (ProcessGroupEntity) – the ProcessGroup to work with
Returns:(VersionControlInformationEntity)
nipyapi.versioning.create_flow(bucket_id, flow_name, flow_desc='', flow_type='Flow')[source]

Creates a new VersionedFlow stub in NiFi Registry. Can be used to write VersionedFlow information to without using a NiFi Process Group directly

Parameters:
  • bucket_id (str) – UUID of the Bucket to write to
  • flow_name (str) – Name for the new VersionedFlow object
  • flow_desc (Optional [str]) – Description for the new VersionedFlow object
  • flow_type (Optional [str]) – Type of the VersionedFlow, should be ‘Flow’
Returns:

(VersionedFlow)

nipyapi.versioning.create_flow_version(flow, flow_snapshot, refresh=True)[source]

EXPERIMENTAL

Writes a FlowSnapshot into a VersionedFlow as a new version update

Note that this differs from save_flow_ver which creates a new Flow Version containing the snapshot. This function writes a snapshot to an existing Flow Version. Useful in migrating Flow Versions between environments.

Parameters:
  • flow (VersionedFlowObject) – the VersionedFlow object to write to
  • flow_snapshot (VersionedFlowSnapshot) – the Snapshot to write into the VersionedFlow
  • refresh (bool) – Whether to refresh the object status before actioning
Returns:

The new (VersionedFlowSnapshot)

nipyapi.versioning.get_flow_version(bucket_id, flow_id, version=None, export=False)[source]

Retrieves the latest, or a specific, version of a Flow

Parameters:
  • bucket_id (str) – the UUID of the bucket containing the Flow
  • flow_id (str) – the UUID of the Flow to be retrieved from the Bucket
  • version (Optional [None, str]) – ‘None’ to retrieve the latest version, or a version number as a string to get that version
  • export (bool) – True to get the raw json object from the server for export, False to get the native DataType
Returns:

If export=False, or the raw json otherwise

Return type:

(VersionedFlowSnapshot)

WARNING: This call is impacted by https://issues.apache.org/jira/browse/NIFIREG-135 Which means you sometimes can’t trust the version count

nipyapi.versioning.export_flow_version(bucket_id, flow_id, version=None, file_path=None, mode='json')[source]

Convenience method to export the identified VersionedFlowSnapshot in the provided format mode.

Parameters:
  • bucket_id (str) – the UUID of the bucket containing the Flow
  • flow_id (str) – the UUID of the Flow to be retrieved from the Bucket
  • version (Optional [None, Str]) – ‘None’ to retrieve the latest version, or a version number as a string to get that version
  • file_path (str) – The path and filename to write to. Defaults to None which returns the serialised obj
  • mode (str) – ‘json’ or ‘yaml’ to specific the encoding format
Returns:

(str) of the encoded Snapshot

nipyapi.versioning.import_flow_version(bucket_id, encoded_flow=None, file_path=None, flow_name=None, flow_id=None)[source]

Imports a given encoded_flow version into the bucket and flow described, may optionally be passed a file to read the encoded flow_contents from.

Note that only one of encoded_flow or file_path, and only one of flow_name or flow_id should be specified.

Parameters:
  • bucket_id (str) – UUID of the bucket to write the encoded_flow version
  • encoded_flow (Optional [str]) – The encoded flow to import; if not specified file_path is read from.
  • file_path (Optional [str]) – The file path to read the encoded flow from , if not specified encoded_flow is read from.
  • flow_name (Optional [str]) – If this is to be the first version in a new flow object, then this is the String name for the flow object.
  • flow_id (Optional [str]) – If this is a new version for an existing flow object, then this is the ID of that object.
Returns:

The new (VersionedFlowSnapshot)

nipyapi.versioning.list_flow_versions(bucket_id, flow_id, registry_id=None, service='registry')[source]

EXPERIMENTAL List all the versions of a given Flow in a given Bucket

Parameters:
  • bucket_id (str) – UUID of the bucket holding the flow to be enumerated
  • flow_id (str) – UUID of the flow in the bucket to be enumerated
  • registry_id (str) – UUID of the registry client linking the bucket, only required if requesting flows via NiFi instead of directly Registry
  • service (str) – Accepts ‘nifi’ or ‘registry’, indicating which service to query
Returns:

list(VersionedFlowSnapshotMetadata) or

(VersionedFlowSnapshotMetadataSetEntity)

nipyapi.versioning.deploy_flow_version(parent_id, location, bucket_id, flow_id, reg_client_id, version=None)[source]

Deploys a versioned flow as a new process group inside the given parent process group. If version is not provided, the latest version will be deployed.

Parameters:
  • parent_id (str) – The ID of the parent Process Group to create the new process group in.
  • location (tuple[x, y]) – the x,y coordinates to place the new Process Group under the parent
  • bucket_id (str) – ID of the bucket containing the versioned flow to deploy.
  • reg_client_id (str) – ID of the registry client connection to use.
  • flow_id (str) – ID of the versioned flow to deploy.
  • version (Optional [int,str]) – version to deploy, if not provided latest version will be deployed.
Returns:

(ProcessGroupEntity) of the newly deployed Process Group