Demos

These modules leverage functionality within the rest of the Package to demonstrate various capabilities

Client SDK modules

These wrapper modules contain collections of convenience functions for daily operations of your NiFi and NiFi-Registry environment. They wrap and surface underlying data structures and calls to the full SDK swagger clients which are also included in the package.

Canvas

For interactions with the NiFi Canvas.

nipyapi.canvas.get_root_pg_id()[source]

Convenience function to return the UUID of the Root Process Group

Returns (str): The UUID of the root PG

nipyapi.canvas.recurse_flow(pg_id='root')[source]

Returns information about a Process Group and all its Child Flows. Recurses the child flows by appending each process group with a ‘nipyapi_extended’ parameter which contains the child process groups, etc.

Parameters:pg_id (str) – The Process Group UUID
Returns:enriched NiFi Flow object
Return type:(ProcessGroupFlowEntity)
nipyapi.canvas.get_flow(pg_id='root')[source]

Returns information about a Process Group and flow.

This surfaces the native implementation, for the recursed implementation see ‘recurse_flow’

Parameters:pg_id (str) – id of the Process Group to retrieve, defaults to the root process group if not set
Returns:The Process Group object
Return type:(ProcessGroupFlowEntity)
nipyapi.canvas.get_process_group_status(pg_id='root', detail='names')[source]

Returns an entity containing the status of the Process Group. Optionally may be configured to return a simple dict of name:id pairings

Note that there is also a ‘process group status’ command available, but it returns a subset of this data anyway, and this call is more useful

Parameters:
  • pg_id (str) – The UUID of the Process Group
  • detail (str) – ‘names’ or ‘all’; whether to return a simple dict of name:id pairings, or the full details. Defaults to ‘names’
Returns:

The Process Group Entity including the status

Return type:

(ProcessGroupEntity)

nipyapi.canvas.get_process_group(identifier, identifier_type='name')[source]

Filters the list of all process groups against a given identifier string occuring in a given identifier_type field.

Parameters:
  • identifier (str) – the string to filter the list for
  • identifier_type (str) – the field to filter on, set in config.py
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.canvas.list_all_process_groups()[source]

Returns a flattened list of all Process Groups on the canvas. Potentially slow if you have a large canvas.

Note that the ProcessGroupsApi().get_process_groups(pg_id) command only provides the first layer of pgs, whereas this trawls the entire canvas

Returns:list[ProcessGroupEntity]
nipyapi.canvas.delete_process_group(process_group, force=False, refresh=True)[source]

Deletes a given Process Group, with optional prejudice.

Parameters:
  • process_group (ProcessGroupEntity) – The target Process Group
  • force (bool) – Stop, purge and clean the target Process Group before deletion. Experimental.
  • refresh (bool) – Whether to refresh the state first
Returns:

The updated object state

Return type:

(ProcessGroupEntity

nipyapi.canvas.schedule_process_group(process_group_id, scheduled)[source]

Start or Stop a Process Group and all components.

Note that this doesn’t guarantee that all components have started, as some may be in Invalid states.

Parameters:
  • process_group_id (str) – The UUID of the target Process Group
  • scheduled (bool) – True to start, False to stop
Returns:

True of successfully scheduled, False if not

Return type:

(bool)

nipyapi.canvas.create_process_group(parent_pg, new_pg_name, location)[source]

Creates a new Process Group with the given name under the provided parent Process Group at the given Location

Parameters:
  • parent_pg (ProcessGroupEntity) – The parent Process Group to create the new process group in
  • new_pg_name (str) – The name of the new Process Group
  • location (tuple[x, y]) – the x,y coordinates to place the new Process Group under the parent
Returns:

The new Process Group

Return type:

(ProcessGroupEntity)

nipyapi.canvas.list_all_processors()[source]

Returns a flat list of all Processors anywhere on the canvas

Returns:list[ProcessorEntity]
nipyapi.canvas.list_all_processor_types()[source]

Produces the list of all available processor types in the NiFi instance

Returns:A native datatype containing the processors list
Return type:list(ProcessorTypesEntity)
nipyapi.canvas.get_processor_type(identifier, identifier_type='name')[source]

Gets the abstract object describing a Processor, or list thereof

Parameters:
  • identifier (str) – the string to filter the list for
  • identifier_type (str) – the field to filter on, set in config.py
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.canvas.create_processor(parent_pg, processor, location, name=None, config=None)[source]

Instantiates a given processor on the canvas

Parameters:
  • parent_pg (ProcessGroupEntity) – The parent Process Group
  • processor (DocumentedTypeDTO) – The abstract processor type object to be instantiated
  • location (tuple[x, y]) – The location coordinates
  • name (Optional [str]) – The name for the new Processor
  • config (Optional [ProcessorConfigDTO]) – A configuration object for the new processor
Returns:

The new Processor

Return type:

(ProcessorEntity)

nipyapi.canvas.delete_processor(processor, refresh=True, force=False)[source]

Deletes a Processor from the canvas, with optional prejudice.

Parameters:
  • processor (ProcessorEntity) – The processor to delete
  • refresh (bool) – Whether to refresh the Processor state before action
  • force (bool) – Whether to stop the Processor before deletion. Behavior may change in future releases. Experimental.
Returns:

The updated ProcessorEntity

Return type:

(ProcessorEntity)

nipyapi.canvas.get_processor(identifier, identifier_type='name')[source]

Filters the list of all Processors against the given identifier string in the given identifier_type field

Parameters:
  • identifier (str) – The String to filter against
  • identifier_type (str) – The field to apply the filter to. Set in config.py
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.canvas.schedule_processor(processor, scheduled, refresh=True)[source]

Set a Processor to Start or Stop.

Note that this doesn’t guarantee that it will change state, merely that it will be instructed to try. Some effort is made to wait and see if the processor starts

Parameters:
  • processor (ProcessorEntity) – The Processor to target
  • scheduled (bool) – True to start, False to stop
  • refresh (bool) – Whether to refresh the object before action
Returns:

True for success, False for failure

Return type:

(bool)

nipyapi.canvas.update_processor(processor, update)[source]

Updates configuration parameters for a given Processor.

An example update would be: nifi.ProcessorConfigDTO(scheduling_period=‘3s’)

Parameters:
Returns:

The updated ProcessorEntity

Return type:

(ProcessorEntity)

nipyapi.canvas.get_variable_registry(process_group, ancestors=True)[source]

Gets the contents of the variable registry attached to a Process Group

Parameters:
  • process_group (ProcessGroupEntity) – The Process Group to retrieve the Variable Registry from
  • ancestors (bool) – Whether to include the Variable Registries from child Process Groups
Returns:

The Variable Registry

Return type:

(VariableRegistryEntity)

nipyapi.canvas.update_variable_registry(process_group, update)[source]

Updates one or more key:value pairs in the variable registry

Parameters:
  • process_group (ProcessGroupEntity) – The Process Group which has the
  • Registry to be updated (Variable) –
  • update (tuple[key, value]) – The variables to write to the registry
Returns:

The created or updated Variable Registry Entries

Return type:

(VariableRegistryEntity)

nipyapi.canvas.get_connections(pg_id)[source]

EXPERIMENTAL List all child connections within a given Process Group

Parameters:pg_id (str) – The UUID of the target Process Group
Returns:A native datatype which contains the list of all Connections in the Process Group
Return type:(ConnectionsEntity)
nipyapi.canvas.purge_connection(con_id)[source]

EXPERIMENTAL Drops all FlowFiles in a given connection. Waits until the action is complete before returning.

Note that if upstream component isn’t stopped, more data may flow into the connection after this action.

Parameters:con_id (str) – The UUID of the Connection to be purged
Returns:The status reporting object for the drop request.
Return type:(DropRequestEntity)
nipyapi.canvas.purge_process_group(process_group, stop=False)[source]

EXPERIMENTAL Purges the connections in a given Process Group of FlowFiles, and optionally stops it first

Parameters:
  • process_group (ProcessGroupEntity) – Target Process Group
  • stop (Optional [bool]) – Whether to stop the Process Group before action
Returns:

True|False}]): Result set. A list of Dicts of

Return type:

(list[dict{ID

Connection IDs mapped to True or False for success of each connection

nipyapi.canvas.get_bulletins()[source]

Retrieves current bulletins (alerts) from the Flow Canvas

Returns:The native datatype containing a list
Return type:(ControllerBulletinsEntity)

of bulletins

nipyapi.canvas.get_bulletin_board()[source]

Retrieves the bulletin board object

Returns:The native datatype BulletinBoard object
Return type:(BulletinBoardEntity)

Config

A set of defaults and parameters used elsewhere in the project. Also provides a handy link to the low-level client SDK configuration singleton objects.

Security

Secure connectivity management for NiPyApi

nipyapi.security.create_service_user(identity, service='nifi')[source]

Attempts to create a user with the provided identity in the given service

Parameters:
  • identity (str) – Identity string for the user
  • service (str) – ‘nifi’ or ‘registry’
Returns:

The new (User) or (UserEntity) object

nipyapi.security.service_login(service='nifi', username=None, password=None, bool_response=False)[source]

Login to the currently configured NiFi or NiFi-Registry server.

Login requires a secure connection over https. Prior to calling this method, the host must be specified and the SSLContext should be configured (if necessary).

Successful login will result in a generated token (JWT) being cached in the api_client config that will be passed in all future REST API calls. To clear that token, call service_logout.

The token is temporary and will expire after a duration set by the server. After a token expires, you must call this method again to generate a new token.

Parameters:
  • service (str) – ‘nifi’ or ‘registry’; the service to login to
  • username (str) – The username to submit
  • password (str) – The password to use
  • bool_response (bool) – If True, the function will return False instead of an error. Useful for connection testing.
Returns:

True if successful, False or an Error if not. See bool_response

Return type:

(bool)

nipyapi.security.set_service_auth_token(token=None, token_name='tokenAuth', service='nifi')[source]

Helper method to set the auth token correctly for the specified service

Parameters:
  • token (Optional[str]) – The token to set. Defaults to None.
  • token_name (str) – the api_key field name to set the token to. Defaults to ‘tokenAuth’
  • service (str) – ‘nifi’ or ‘registry’, the service to set
Returns:

True on success, False if token not set

Return type:

(bool)

nipyapi.security.service_logout(service='nifi')[source]

Logs out from the service by resetting the token :param service: ‘nifi’ or ‘registry’; the target service :type service: str

Returns:True of access removed, False if still set
Return type:(bool)
nipyapi.security.get_service_access_status(service='nifi', bool_response=False)[source]

Gets the access status for the current session

Parameters:
  • service (str) – A String of ‘nifi’ or ‘registry’ to indicate which service to check status for
  • bool_response (bool) – If True, the function will return False on hitting an Error instead of raising it. Useful for connection testing.
Returns:

(bool) if bool_response, else the Service Access Status of the User

nipyapi.security.add_user_to_access_policy(user, policy, service='nifi', refresh=True)[source]

Attempts to add the given user object to the given access policy

Parameters:
  • user (User) or (UserEntity) – User object to add
  • policy (AccessPolicyEntity) or (AccessPolicy) – Access Policy object
  • service (str) – ‘nifi’ or ‘registry’ to identify the target service
  • refresh (bool) – Whether to refresh the policy object before submission
Returns:

Updated Policy object

nipyapi.security.update_access_policy(policy, service='nifi')[source]

Applies an updated access policy to the service indicated

Parameters:
  • policy (PolicyEntity) – The policy object to submit
  • service (str) – ‘nifi’ or ‘registry’ to indicate the target service
Returns:

The updated policy if successful

Return type:

(PolicyEntity)

nipyapi.security.get_access_policy_for_resource(resource, action, r_id=None, service='nifi', auto_create=False)[source]

Attempts to retrieve the access policy for a given resource and action, and optionally resource_id if targeting NiFi. Optionally creates the policy if it doesn’t already exist

Parameters:
  • resource (str) – A valid resource in the taret service
  • action (str) – A valid action, typically ‘read’, ‘write’ or ‘delete’
  • r_id (Optional[str]) – The UUID of the resource, valid only if targeting NiFi resources
  • service (str) – Which service to target, typically ‘nifi’ or ‘registry’
  • auto_create (bool) – Whether to create the targeted policy if it doesn’t already exist
Returns:

The relevant AccessPolicy object

nipyapi.security.create_access_policy(resource, action, r_id=None, service='nifi')[source]

Creates an access policy for the given resource, action and optionally resource_id for NiFi.

Parameters:
  • resource (str) – a valid resource type for this service, e.g. ‘bucket’
  • action (str) – a valid action type for this service, typically ‘read’, ‘write’ or ‘delete’
  • r_id (optional[str]) – if NiFi, the resource ID of the resource
  • service (str) – the service to target
Returns:

An access policy object for that service

nipyapi.security.list_service_users(service='nifi')[source]

Lists all users of a given service, takes a service name as a string

nipyapi.security.get_service_user(identifier, identifier_type='identity', service='nifi')[source]

Filters the all users list for a given identifier and type

Parameters:
  • identifier (str) – the string to search for
  • identifier_type (str) – the field to search in
  • service (str) – the name of the service
Returns:

None if no match, list of multiple matches, else single object

nipyapi.security.set_service_ssl_context(service='nifi', ca_file=None, client_cert_file=None, client_key_file=None, client_key_password=None)[source]

Create an SSLContext for connecting over https to a secured NiFi or NiFi-Registry instance.

This method can be used to create an SSLContext for two-way TLS in which a client cert is used by the service to authenticate the client.

This method can also be used for one-way TLS in which the client verifies the server’s certificate, but authenticates using a different form of credentials, such as LDAP username/password.

If you are using one-way TLS with a certificate signed by a root CA trusted by your system/platform, this step is not necessary as the default TLS-handshake should “just work.”

Parameters:
  • service (str) – ‘nifi’ or ‘registry’ to indicate which service config to set the ssl context to
  • ca_file (str) – A PEM file containing certs for the root CA(s) for the NiFi Registry server
  • client_cert_file (str) – A PEM file containing the public certificates for the user/client identity
  • client_key_file (str) – An encrypted (password-protected) PEM file containing the client’s secret key
  • client_key_password (str) – The password to decrypt the client_key_file
Returns:

(None)

System

For system and cluster level functions interacting with the underlying NiFi Services

nipyapi.system.get_system_diagnostics()[source]

Returns NiFi Sytems diagnostics page

Returns (json):

nipyapi.system.get_cluster()[source]

EXPERIMENTAL Returns the contents of the NiFi cluster

Returns (json):

nipyapi.system.get_node(nid)[source]

Returns the cluster node information

Parameters:nid (str) – The UUID of the Node to target

Returns:

nipyapi.system.get_nifi_version_info()[source]

Returns the version information of the connected NiFi instance

Returns (VersionInfoDTO):

Templates

For managing flow deployments

nipyapi.templates.list_all_templates()[source]

Gets a list of all templates on the canvas

Returns:A list of TemplateEntity’s
Return type:(list[TemplateEntity])
nipyapi.templates.get_template_by_name(name)[source]

DEPRECATED Returns a specific template by name, if it exists.

Note: This function is replaced by get_template

Parameters:name (str) – The Name of the template, exact match required
Returns:(TemplateEntity)
nipyapi.templates.deploy_template(pg_id, template_id, loc_x=0, loc_y=0)[source]

Instantiates a given template request in a given process group

Parameters:
  • pg_id (str) – The UUID of the Process Group to deploy into
  • template_id (str) – The UUID of the Template to deploy. Note that the Template must already be uploaded and available to the target Process Group
  • loc_x (int) – The X coordinate to deploy the Template at. Default(0)
  • loc_y (int) – The X coordinate to deploy the Template at. Default(0)
Returns:

The FlowEntity of the Process Group with the deployed

template

Return type:

(FlowEntity)

nipyapi.templates.upload_template(pg_id, template_file)[source]

Uploads a given template xml from from the file system to the given Process Group

Parameters:
  • pg_id (str) – The UUID of the Process Group to upload to
  • template_file (str) – The path including filename to the template file
Returns:

The new Template object

Return type:

(TemplateEntity)

nipyapi.templates.create_pg_snippet(pg_id)[source]

Creates a snippet of the targeted process group, and returns the object ready to be turned into a Template

Parameters:pg_id – UUID of the process Group to snippet
Returns:The Snippet Object
Return type:(SnippetEntity)
nipyapi.templates.create_template(pg_id, name, desc='')[source]

Creates a Template from a Process Group

Parameters:
  • pg_id (str) – The UUID of the target Process Group
  • name (str) – The name for the new Template. Must be unique
  • desc (optional[str]) – The description for the new Template
Returns:

The newly created Template

Return type:

(TemplateEntity)

nipyapi.templates.delete_template(t_id)[source]

Deletes a Template

Parameters:t_id (str) – UUID of the Template to be deleted
Returns:The updated Template object
nipyapi.templates.export_template(t_id, output='string', file_path=None)[source]

Exports a given Template as either a string or a file.

Note that to reimport the Template it must be a file

Parameters:
  • t_id (str) – The UUID of the Template to export
  • output (str) – ‘string’ or ‘file’ to set the export action
  • file_path (Optional [str]) – The full path including filename to write the Template export to
Returns:

A String representation of the exported Template XML. Note

that this may not be utf-8 encoded.

Return type:

(str)

nipyapi.templates.get_template(identifier, identifier_type='name')[source]

Filters the list of all Templates for a given string in a given field. Note that filters are configured in config.py

Parameters:
  • identifier (str) – The string to filter on
  • identifier_type (str) – The identifier of the field to filter on
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

Utils

Convenience utility functions for NiPyApi, not really intended for external use

nipyapi.utils.dump(obj, mode='json')[source]

Dumps a native datatype object to json or yaml, defaults to json

Parameters:
  • obj (varies) – The native datatype object to serialise
  • mode (str) – ‘json’ or ‘yaml’, the supported export modes

Returns (str): The serialised object

nipyapi.utils.load(obj, dto=None)[source]

Loads a serialised object back into native datatypes, and optionally imports it back into the native NiFi DTO

Warning: Using this on objects not produced by this Package may have unintended results! While efforts have been made to ensure that unsafe loading is not possible, no stringent security testing has been completed.

Parameters:
  • obj (dict, list) – The serialised object to import
  • dto (Optional [tuple{str, str}]) – A Tuple describing the service and
  • that should be constructed. (object) –
  • dto = (e.g.) –
Returns: Either the loaded object in native Python datatypes, or the
constructed native datatype object
nipyapi.utils.fs_read(file_path)[source]

Convenience function to read an Object from a FilePath

Parameters:file_path (str) – The Full path including filename to read from

Returns: The object that was read

nipyapi.utils.fs_write(obj, file_path)[source]

Convenience function to write an Object to a FilePath

Parameters:
  • obj (varies) – The Object to write out
  • file_path (str) – The Full path including filename to write to

Returns: The object that was written

nipyapi.utils.filter_obj(obj, value, key)[source]

Implements a custom filter method because native datatypes don’t have consistently named or located fields.

Note that each object used by this function must be registered with identifier_types and identifiers in config

Parameters:
  • obj (varies) – the NiFi or NiFi-Registry object to filter on
  • value (str) – the String value to look for
  • key (str) – the object key to filter against

Returns: None if 0 matches, list if > 1, single Object entity if ==1

nipyapi.utils.wait_to_complete(test_function, *args, **kwargs)[source]

Implements a basic return loop for a given function which is capable of a True|False output

Parameters:
  • test_function – Function which returns a bool once the target state is reached
  • delay (int) – The number of seconds between each attempt, defaults to config.short_retry_delay
  • max_wait (int) – the maximum number of seconds before issuing a Timeout, defaults to config.short_max_wait
  • *args – Any args to pass through to the test function
  • **kwargs – Any Keword Args to pass through to the test function

Returns (bool): True for success, False for not

nipyapi.utils.is_endpoint_up(endpoint_url)[source]

Tests if a URL is available for requests

Parameters:endpoint_url (str) – The URL to test

Returns (bool): True for a 200 response, False for not

nipyapi.utils.set_endpoint(endpoint_url)[source]

EXPERIMENTAL

Sets the endpoint when switching between instances of NiFi or other projects. Not tested extensively with secured instances.

Parameters:
  • endpoint_url (str) – The URL to set as the endpoint. Autodetects the
  • service e.g. 'http (relevant) – //localhost:18080/nifi-registry-api’

Returns (bool): True for success, False for not

nipyapi.utils.start_docker_containers(docker_containers, network_name='demo')[source]

Deploys a list of DockerContainer’s on a given network

Parameters:
  • docker_containers (list[DockerContainer]) – list of Dockers to start
  • network_name (str) – The name of the Docker Bridge Network to get or create for the Docker Containers

Returns: Nothing

class nipyapi.utils.DockerContainer(name=None, image_name=None, image_tag=None, ports=None, env=None, volumes=None, test_url=None, endpoint=None)[source]

Bases: object

Helper class for Docker container automation without using Ansible

get_test_url_status()[source]

Checks if a URL is available :return: status code if available, String ‘ConnectionError’ if not

Versioning

For interactions with the NiFi Registry Service and related functions

nipyapi.versioning.create_registry_client(name, uri, description)[source]

Creates a Registry Client in the NiFi Controller Services

Parameters:
  • name (str) – The name of the new Client
  • uri (str) – The URI for the connection, such as ‘http://registry:18080
  • description (str) – A description for the Client
Returns:

The new registry client object

Return type:

(RegistryClientEntity)

nipyapi.versioning.list_registry_clients()[source]

Lists the available Registry Clients in the NiFi Controller Services

Returns:(list[RegistryClientEntity]) objects
nipyapi.versioning.delete_registry_client(client, refresh=True)[source]

Deletes a Registry Client from the list of NiFI Controller Services

Parameters:
  • client (RegistryClientEntity) – The client to delete
  • refresh (bool) – Whether to refresh the object before action
Returns:

The updated client object

Return type:

(RegistryClientEntity)

nipyapi.versioning.get_registry_client(identifier, identifier_type='name')[source]

Filters the Registry clients to a particular identifier

Parameters:
  • identifier (str) – the filter string
  • identifier_type (str) – the parameter to filter on
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.versioning.list_registry_buckets()[source]

Lists all available Buckets in the NiFi Registry

Returns:(list[Bucket]) objects
nipyapi.versioning.create_registry_bucket(name)[source]

Creates a new Registry Bucket

Parameters:name (str) – name for the bucket, must be unique in the Registry
Returns:The new Bucket object
Return type:(Bucket)
nipyapi.versioning.delete_registry_bucket(bucket)[source]

Removes a bucket from the NiFi Registry

Parameters:bucket (Bucket) – the Bucket object to remove
Returns:The updated Bucket object
Return type:(Bucket)
nipyapi.versioning.get_registry_bucket(identifier, identifier_type='name')[source]

Filters the Bucket list to a particular identifier

Parameters:
  • identifier (str) – the filter string
  • identifier_type (str) – the param to filter on
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.versioning.save_flow_ver(process_group, registry_client, bucket, flow_name=None, flow_id=None, comment='', desc='', refresh=True)[source]

Adds a Process Group into NiFi Registry Version Control, or saves a new version to an existing VersionedFlow with a new version

Parameters:
  • process_group (ProcessGroupEntity) – the ProcessGroup object to save as a new Flow Version
  • registry_client (RegistryClient) – The Client linked to the Registry which contains the Bucket to save to
  • bucket (Bucket) – the Bucket on the NiFi Registry to save to
  • flow_name (str) – A name for the VersionedFlow in the Bucket Note you need either a name for a new VersionedFlow, or the ID of an existing one to save a new version
  • flow_id (Optional [str]) – Identifier of an existing VersionedFlow in the bucket, if saving a new version to an existing flow
  • comment (str) – A comment for the version commit
  • desc (str) – A description of the VersionedFlow
  • refresh (bool) – whether to refresh the object revisions before action
Returns:

(VersionControlInformationEntity)

nipyapi.versioning.list_flows_in_bucket(bucket_id)[source]

List of all Flows in a given NiFi Registry Bucket

Parameters:bucket_id (str) – The UUID of the Bucket to fetch from
Returns:(list[VersionedFlow]) objects
nipyapi.versioning.get_flow_in_bucket(bucket_id, identifier, identifier_type='name')[source]

Filters the Flows in a Bucket against a particular identifier

Parameters:
  • bucket_id (str) – UUID of the Bucket to filter against
  • identifier (str) – The string to filter on
  • identifier_type (str) – The param to check
Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.versioning.get_latest_flow_ver(bucket_id, flow_id)[source]

Gets the most recent version of a VersionedFlowSnapshot from a bucket

Parameters:
  • bucket_id (str) – the UUID of the Bucket containing the flow
  • flow_id (str) – the UUID of the VersionedFlow to be retrieved
Returns:

(VersionedFlowSnapshot)

nipyapi.versioning.update_flow_ver(process_group, target_version=None)[source]

Changes a versioned flow to the specified version, or the latest version

Parameters:
  • process_group (ProcessGroupEntity) – ProcessGroupEntity under version control to change
  • target_version (Optional [None, Int]) – Either None to move to the
  • available version, or Int of the version number to move to (latest) –
Returns:

True if successful, False if not

Return type:

(bool)

nipyapi.versioning.get_version_info(process_group)[source]

Gets the Version Control information for a particular Process Group

Parameters:process_group (ProcessGroupEntity) – the ProcessGroup to work with
Returns:(VersionControlInformationEntity)
nipyapi.versioning.create_flow(bucket_id, flow_name, flow_desc='', flow_type='Flow')[source]

Creates a new VersionedFlow stub in NiFi Registry. Can be used to write VersionedFlow information to without using a NiFi Process Group directly

Parameters:
  • bucket_id (str) – UUID of the Bucket to write to
  • flow_name (str) – Name for the new VersionedFlow object
  • flow_desc (Optional [str]) – Description for the new VersionedFlow object
  • flow_type (Optional [str]) – Type of the VersionedFlow, should be ‘Flow’
Returns:

(VersionedFlow)

nipyapi.versioning.create_flow_version(flow, flow_snapshot, refresh=True)[source]

EXPERIMENTAL

Writes a FlowSnapshot into a VersionedFlow as a new version update

Note that this differs from save_flow_ver which creates a new Flow Version containing the snapshot. This function writes a snapshot to an existing Flow Version. Useful in migrating Flow Versions between environments.

Parameters:
  • bucket_id (str) – Deprecated, now pulled from the flow parameter
  • flow (VersionedFlowObject) – the VersionedFlow object to write to
  • flow_snapshot (VersionedFlowSnapshot) – the Snapshot to write into the VersionedFlow
  • refresh (bool) – Whether to refresh the object status before actioning
  • raw_snapshot (bool) – Deprecated, as not using a full snapshot resulted in inconsistent behavior
Returns:

The new (VersionedFlowSnapshot)

nipyapi.versioning.get_flow_version(bucket_id, flow_id, version=None, export=False)[source]

Retrieves the latest, or a specific, version of a Flow

Parameters:
  • bucket_id (str) – the UUID of the bucket containing the Flow
  • flow_id (str) – the UUID of the Flow to be retrieved from the Bucket
  • version (Optional [None, str]) – ‘None’ to retrieve the latest version, or a version number as a string to get that version
  • export (bool) – True to get the raw json object from the server for export, False to get the native DataType
Returns:

If export=False, or the raw json otherwise

Return type:

(VersionedFlowSnapshot)

WARNING: This call is impacted by https://issues.apache.org/jira/browse/NIFIREG-135 Which means you can’t trust the version count

nipyapi.versioning.export_flow_version(bucket_id, flow_id, version=None, file_path=None, mode='json')[source]

Convenience method to export the identified VersionedFlowSnapshot in the provided format mode.

Parameters:
  • bucket_id (str) – the UUID of the bucket containing the Flow
  • flow_id (str) – the UUID of the Flow to be retrieved from the Bucket
  • version (Optional [None, Str]) – ‘None’ to retrieve the latest version, or a version number as a string to get that version
  • file_path (str) – The path and filename to write to. Defaults to None which returns the serialised obj
  • mode (str) – ‘json’ or ‘yaml’ to specific the encoding format
Returns:

(str) of the encoded Snapshot

nipyapi.versioning.import_flow_version(bucket_id, encoded_flow=None, file_path=None, flow_name=None, flow_id=None)[source]

Imports a given encoded_flow version into the bucket and flow described, may optionally be passed a file to read the encoded flow_contents from.

Note that only one of encoded_flow or file_path, and only one of flow_name or flow_id should be specified.

Parameters:
  • bucket_id (str) – UUID of the bucket to write the encoded_flow version
  • encoded_flow (Optional [str]) – The encoded flow to import; if not specified file_path is read from.
  • file_path (Optional [str]) – The file path to read the encoded flow from , if not specified encoded_flow is read from.
  • flow_name (Optional [str]) – If this is to be the first version in a new flow object, then this is the String name for the flow object.
  • flow_id (Optional [str]) – If this is a new version for an existing flow object, then this is the ID of that object.
Returns:

The new (VersionedFlowSnapshot)

nipyapi.versioning.list_flow_versions(bucket_id, flow_id)[source]

EXPERIMENTAL List all the versions of a given Flow in a given Bucket

Parameters:
  • bucket_id (str) – UUID of the bucket holding the flow to be enumerated
  • flow_id (str) – UUID of the flow in the bucket to be enumerated
Returns:

list(VersionedFlowSnapshotMetadata)

nipyapi.versioning.deploy_flow_version(parent_id, location, bucket_id, flow_id, reg_client_id, version=None)[source]

Deploys a versioned flow as a new process group inside the given parent process group. If version is not provided, the latest version will be deployed.

Parameters:
  • parent_id (str) – The ID of the parent Process Group to create the new process group in.
  • location (tuple[x, y]) – the x,y coordinates to place the new Process Group under the parent
  • bucket_id (str) – ID of the bucket containing the versioned flow to deploy.
  • flow_id (str) – ID of the versioned flow to deploy.
  • version (Optional [int,str]) – version to deploy, if not provided latest version will be deployed.
Returns:

(ProcessGroupEntity) of the newly deployed Process Group