Versioning
Version control operations
For interactions with the NiFi Registry Service and related functions
- nipyapi.versioning.create_flow(bucket_id, flow_name, flow_desc='', flow_type='Flow')[source]
Creates a new VersionedFlow stub in NiFi Registry. Can be used to write VersionedFlow information to without using a NiFi Process Group directly
- nipyapi.versioning.create_flow_version(flow, flow_snapshot, refresh=True)[source]
EXPERIMENTAL
Writes a FlowSnapshot into a VersionedFlow as a new version update
Note that this differs from save_flow_ver which creates a new Flow Version containing the snapshot. This function writes a snapshot to an existing Flow Version. Useful in migrating Flow Versions between environments.
- Parameters:
flow (VersionedFlowObject) – the VersionedFlow object to write to
flow_snapshot (VersionedFlowSnapshot) – the Snapshot to write into the VersionedFlow
refresh (bool) – Whether to refresh the object status before actioning
- Returns:
The new (VersionedFlowSnapshot)
- nipyapi.versioning.create_registry_bucket(name, description=None)[source]
Creates a new Registry Bucket
- nipyapi.versioning.create_registry_client(name, uri=None, description='', reg_type=None, properties=None, ssl_context_service=None)[source]
Creates a Registry Client in the NiFi Controller Services
- Parameters:
name (str) – The name of the new Client
uri (str, optional) – The URI for the connection. Required for NiFi Registry. Ignored for Git-based registries. Defaults to None.
description (str, optional) – A description for the Client. Defaults to empty string.
reg_type (str, optional) – The type of registry client to create. Defaults to ‘org.apache.nifi.registry.flow.NifiRegistryFlowRegistryClient’. Other options include: - ‘org.apache.nifi.github.GitHubFlowRegistryClient’ - ‘org.apache.nifi.gitlab.GitLabFlowRegistryClient’ - ‘org.apache.nifi.atlassian.bitbucket.BitbucketFlowRegistryClient’ - ‘org.apache.nifi.azure.devops.AzureDevOpsFlowRegistryClient’
properties (dict, optional) – Properties to configure the client. If provided, these are used directly. If not provided, defaults are used based on reg_type. For NiFi Registry, defaults to {‘url’: uri} if uri is provided. For Git-based registries, starts empty (must be configured).
ssl_context_service (ControllerServiceEntity, optional) – SSL Context Service (only applicable for NiFi Registry type). Defaults to None.
- Returns:
The new registry client object
- Return type:
Example
>>> # NiFi Registry client >>> nifi_reg = nipyapi.versioning.create_registry_client( ... name='my-registry', ... uri='http://localhost:18080', ... description='My NiFi Registry' ... )
>>> # GitHub client with properties >>> github_client = nipyapi.versioning.create_registry_client( ... name='github-reg', ... reg_type='org.apache.nifi.github.GitHubFlowRegistryClient', ... description='GitHub Registry', ... properties={ ... 'Repository Owner': 'myorg', ... 'Repository Name': 'myrepo', ... 'Authentication Type': 'PERSONAL_ACCESS_TOKEN', ... 'Personal Access Token': 'ghp_xxx...', ... 'Default Branch': 'main' ... } ... )
- nipyapi.versioning.delete_registry_client(client, refresh=True)[source]
Deletes a Registry Client from the list of NiFI Controller Services
- Parameters:
client (FlowRegistryClientEntity) – The client to delete
refresh (bool) – Whether to refresh the object before action
- Returns:
The updated client object
- Return type:
- nipyapi.versioning.deploy_flow_version(parent_id, location, bucket_id, flow_id, reg_client_id, version=None)[source]
Deploys a versioned flow as a new process group inside the given parent process group. If version is not provided, the latest version will be deployed.
- Parameters:
parent_id (str) – The ID of the parent Process Group to create the new process group in.
location (tuple[x, y]) – the x,y coordinates to place the new Process Group under the parent
bucket_id (str) – ID of the bucket containing the versioned flow to deploy.
reg_client_id (str) – ID of the registry client connection to use.
flow_id (str) – ID of the versioned flow to deploy.
version (Optional [int,str]) – version to deploy, if not provided latest version will be deployed.
- Returns:
(ProcessGroupEntity) of the newly deployed Process Group
- nipyapi.versioning.deploy_git_registry_flow(registry_client_id, bucket_id, flow_id, parent_id, location=None, version=None, branch=None)[source]
Deploy a flow from a Git-based registry to the NiFi canvas.
Creates a new Process Group under the specified parent, linked to version control from the Git-based registry. The deployed flow will track the specified version (or latest) from the repository.
- Parameters:
registry_client_id (str) – The ID of the Git-based registry client.
bucket_id (str) – The bucket name (folder name) containing the flow.
flow_id (str) – The flow name (filename without .json).
parent_id (str) – The ID of the parent Process Group to deploy into.
location (tuple, optional) – (x, y) coordinates for placement. Defaults to (0, 0).
version (str, optional) – Specific version (commit hash) to deploy. If None, deploys the latest version.
branch (str, optional) – The branch to deploy from. If None, uses the registry client’s configured default branch.
- Returns:
- The newly deployed
Process Group.
- Return type:
Example
>>> client = nipyapi.versioning.get_registry_client('my-github-client') >>> root_id = nipyapi.canvas.get_root_pg_id() >>> pg = nipyapi.versioning.deploy_git_registry_flow( ... client.id, 'flows', 'http-responder', root_id ... )
- nipyapi.versioning.ensure_registry_bucket(name, description=None)[source]
Ensures a Registry Bucket exists, creating it if necessary.
This is a convenience function that implements the common pattern of: 1. Try to get existing bucket by name 2. If not found, create it 3. Handle race conditions gracefully
- nipyapi.versioning.ensure_registry_client(name, uri=None, description='', reg_type=None, properties=None, ssl_context_service=None)[source]
Ensures a Registry Client exists with the desired configuration.
This is a convenience function that implements the common pattern of: 1. Try to get existing client by name 2. If found and properties provided, update the client 3. If not found, create it 4. Handle race conditions gracefully
For Git-based registries (GitHub, GitLab, etc.), if properties are provided and client exists, the client will be updated. This supports CI/CD workflows where branch or credentials may change between deployments. Sensitive properties (like tokens) are always applied if provided since existing values cannot be inspected.
- Parameters:
name (str) – The name of the Client
uri (str, optional) – The URI for the connection. Required for NiFi Registry. Ignored for Git-based registries. Defaults to None.
description (str, optional) – A description for the Client. Defaults to empty string.
reg_type (str, optional) – The type of registry client to create. Defaults to ‘org.apache.nifi.registry.flow.NifiRegistryFlowRegistryClient’
properties (dict, optional) – Properties to configure the client. If provided and client exists, will update the existing client with these properties.
ssl_context_service (ControllerServiceEntity, optional) – SSL Context Service
- Returns:
The registry client object (existing, updated, or new)
- Return type:
Note
For NiFi Registry clients, URI mismatch triggers recreation. For Git-based clients, properties trigger an update (not recreation).
- nipyapi.versioning.export_flow_version(bucket_id, flow_id, version=None, file_path=None, mode='json')[source]
Convenience method to export the identified VersionedFlowSnapshot in the provided format mode.
- Parameters:
bucket_id (str) – the UUID of the bucket containing the Flow
flow_id (str) – the UUID of the Flow to be retrieved from the Bucket
version (Optional [None, Str]) – ‘None’ to retrieve the latest version, or a version number as a string to get that version
file_path (str) – The path and filename to write to. Defaults to None which returns the serialised obj
mode (str) – ‘json’ or ‘yaml’ to specific the encoding format
- Returns:
(str) of the encoded Snapshot
- nipyapi.versioning.export_process_group_definition(process_group, file_path=None, mode='json')[source]
Export a process group as a flow definition (NiFi 2.x format). Does NOT require NiFi Registry - exports the current state of the process group.
- Parameters:
process_group (ProcessGroupEntity) – The process group to export
file_path (str, optional) – Path to write the export to. If None, returns the serialized string
mode (str) – Export format - ‘json’ or ‘yaml’. Defaults to ‘json’
- Returns:
- The serialized flow definition if file_path is None, otherwise
the path written to
- Return type:
Example
>>> pg = nipyapi.canvas.get_process_group('my-flow') >>> nipyapi.versioning.export_process_group_definition( ... pg, file_path='my-flow.json', mode='json' ... ) 'my-flow.json'
- nipyapi.versioning.get_flow_in_bucket(bucket_id, identifier, identifier_type='name', greedy=True)[source]
Filters the Flows in a Bucket against a particular identifier
- Parameters:
- Returns:
None for no matches, Single Object for unique match, list(Objects) for multiple matches
- nipyapi.versioning.get_flow_version(bucket_id, flow_id, version=None, export=False)[source]
Retrieves the latest, or a specific, version of a Flow
- Parameters:
bucket_id (str) – the UUID of the bucket containing the Flow
flow_id (str) – the UUID of the Flow to be retrieved from the Bucket
version (Optional [None, str]) – ‘None’ to retrieve the latest version, or a version number as a string to get that version
export (bool) – True to get the raw json object from the server for export, False to get the native DataType
- Returns:
If export=False, or the raw json otherwise
- Return type:
WARNING: This call is impacted by https://issues.apache.org/jira/browse/NIFIREG-135 Which means you sometimes can’t trust the version count
- nipyapi.versioning.get_git_registry_bucket(registry_client_id, identifier, greedy=True, branch=None)[source]
Filters the bucket list from a Git-based registry client.
- Parameters:
registry_client_id (str) – The ID of the Git-based registry client.
identifier (str) – The bucket name (folder name in the repository).
greedy (bool) – False for exact match, True for greedy/partial match.
branch (str, optional) – The branch to query. If None, uses the registry client’s configured default branch.
- Returns:
None for no matches, single object for unique match, list of objects for multiple matches.
Example
>>> client = nipyapi.versioning.get_registry_client('my-github-client') >>> bucket = nipyapi.versioning.get_git_registry_bucket( ... client.id, 'flows', greedy=False ... )
- nipyapi.versioning.get_git_registry_flow(registry_client_id, bucket_id, identifier, greedy=True, branch=None)[source]
Filters the flow list in a bucket from a Git-based registry client.
- Parameters:
registry_client_id (str) – The ID of the Git-based registry client.
bucket_id (str) – The bucket name (folder name) containing the flows.
identifier (str) – The flow name (filename without .json).
greedy (bool) – False for exact match, True for greedy/partial match.
branch (str, optional) – The branch to query. If None, uses the registry client’s configured default branch.
- Returns:
None for no matches, single object for unique match, list of objects for multiple matches.
Example
>>> client = nipyapi.versioning.get_registry_client('my-github-client') >>> flow = nipyapi.versioning.get_git_registry_flow( ... client.id, 'flows', 'http-responder', greedy=False ... )
- nipyapi.versioning.get_latest_flow_ver(bucket_id, flow_id)[source]
Gets the most recent version of a VersionedFlowSnapshot from a bucket
- nipyapi.versioning.get_registry_bucket(identifier, identifier_type='name', greedy=True)[source]
Filters the Bucket list to a particular identifier
- nipyapi.versioning.get_registry_client(identifier, identifier_type='name')[source]
Filters the Registry clients to a particular identifier
- nipyapi.versioning.get_version_info(process_group)[source]
Gets the Version Control information for a particular Process Group
- Parameters:
process_group (ProcessGroupEntity) – the ProcessGroup to work with
- Returns:
- nipyapi.versioning.import_flow_version(bucket_id, encoded_flow=None, file_path=None, flow_name=None, flow_id=None)[source]
Imports a given encoded_flow version into the bucket and flow described, may optionally be passed a file to read the encoded flow_contents from.
Note that only one of encoded_flow or file_path, and only one of flow_name or flow_id should be specified.
- Parameters:
bucket_id (str) – UUID of the bucket to write the encoded_flow version
encoded_flow (Optional [str]) – The encoded flow to import; if not specified file_path is read from.
file_path (Optional [str]) – The file path to read the encoded flow from , if not specified encoded_flow is read from.
flow_name (Optional [str]) – If this is to be the first version in a new flow object, then this is the String name for the flow object.
flow_id (Optional [str]) – If this is a new version for an existing flow object, then this is the ID of that object.
- Returns:
The new (VersionedFlowSnapshot)
- nipyapi.versioning.import_process_group_definition(parent_pg, flow_definition=None, file_path=None, position=None)[source]
Import a flow definition as a new process group (NiFi 2.x format). Does NOT require NiFi Registry - imports from flow definition JSON/YAML.
- Parameters:
parent_pg (ProcessGroupEntity) – Parent process group to import into
flow_definition (str, optional) – Flow definition as JSON or YAML string. Either this or file_path must be provided, but not both
file_path (str, optional) – Path to flow definition file to import. Either this or flow_definition must be provided, but not both
position (tuple, optional) – (x, y) coordinates for the new process group. Defaults to (0, 0)
- Returns:
The newly imported process group
- Return type:
Example
>>> root_pg = nipyapi.canvas.get_process_group( ... nipyapi.canvas.get_root_pg_id(), 'id' ... ) >>> imported_pg = nipyapi.versioning.import_process_group_definition( ... parent_pg=root_pg, ... file_path='my-flow.json', ... position=(100, 100) ... )
- nipyapi.versioning.list_flow_versions(bucket_id, flow_id, registry_id=None, service='registry')[source]
EXPERIMENTAL List all the versions of a given Flow in a given Bucket
- Parameters:
bucket_id (str) – UUID of the bucket holding the flow to be enumerated
flow_id (str) – UUID of the flow in the bucket to be enumerated
registry_id (str) – UUID of the registry client linking the bucket, only required if requesting flows via NiFi instead of directly Registry
service (str) – Accepts ‘nifi’ or ‘registry’, indicating which service to query
- Returns:
- list(VersionedFlowSnapshotMetadata) or
(VersionedFlowSnapshotMetadataSetEntity)
- nipyapi.versioning.list_flows_in_bucket(bucket_id)[source]
List of all Flows in a given NiFi Registry Bucket
- Parameters:
bucket_id (str) – The UUID of the Bucket to fetch from
- Returns:
(list[VersionedFlow]) objects
- nipyapi.versioning.list_git_registry_buckets(registry_client_id, branch=None)[source]
List buckets (folders) from a Git-based registry client.
This function queries a Git-based Flow Registry Client (GitHub, GitLab, Bitbucket, or Azure DevOps) via the NiFi FlowApi to list available buckets. In Git-based registries, buckets correspond to folders in the repository.
- Parameters:
- Returns:
Example
>>> client = nipyapi.versioning.get_registry_client('my-github-client') >>> buckets = nipyapi.versioning.list_git_registry_buckets(client.id) >>> for b in buckets.buckets: ... print(f"{b.id}: {b.bucket.name}")
- nipyapi.versioning.list_git_registry_flow_versions(registry_client_id, bucket_id, flow_id, branch=None)[source]
List all versions of a flow from a Git-based registry client.
- Parameters:
registry_client_id (str) – The ID of the Git-based registry client.
bucket_id (str) – The bucket name (folder name) containing the flow.
flow_id (str) – The flow name (filename without .json) to list versions.
branch (str, optional) – The branch to query. If None, uses the registry client’s configured default branch.
- Returns:
Example
>>> client = nipyapi.versioning.get_registry_client('my-github-client') >>> versions = nipyapi.versioning.list_git_registry_flow_versions( ... client.id, 'flows', 'http-responder' ... )
- nipyapi.versioning.list_git_registry_flows(registry_client_id, bucket_id, branch=None)[source]
List flows in a bucket from a Git-based registry client.
This function queries a Git-based Flow Registry Client to list available flows within a specific bucket. In Git-based registries, flows are JSON files within the bucket folder (e.g., flows/my-flow.json).
- Parameters:
- Returns:
Example
>>> client = nipyapi.versioning.get_registry_client('my-github-client') >>> flows = nipyapi.versioning.list_git_registry_flows(client.id, 'flows') >>> for f in flows.versioned_flows: ... print(f.versioned_flow.flow_id)
- nipyapi.versioning.list_registry_buckets()[source]
Lists all available Buckets in the NiFi Registry
- Returns:
objects
- Return type:
list[
Bucket]
- nipyapi.versioning.list_registry_client_types()[source]
Lists all available Flow Registry Client types in the NiFi instance.
This includes built-in registry types like: - NiFi Registry (org.apache.nifi.registry.flow.NifiRegistryFlowRegistryClient) - GitHub (org.apache.nifi.github.GitHubFlowRegistryClient) - GitLab (org.apache.nifi.gitlab.GitLabFlowRegistryClient) - Bitbucket (org.apache.nifi.atlassian.bitbucket.BitbucketFlowRegistryClient) - Azure DevOps (org.apache.nifi.azure.devops.AzureDevOpsFlowRegistryClient)
- Returns:
- List of available
registry client types with their properties and descriptions
- Return type:
list[
DocumentedTypeDTO]
Example
>>> types = nipyapi.versioning.list_registry_client_types() >>> github_type = [t for t in types if 'GitHub' in t.type][0] >>> print(github_type.type) org.apache.nifi.github.GitHubFlowRegistryClient
- nipyapi.versioning.list_registry_clients()[source]
Lists the available Registry Clients in the NiFi Controller Services
- Returns:
objects
- Return type:
list[
FlowRegistryClientEntity]
- nipyapi.versioning.save_flow_ver(process_group, registry_client, bucket, flow_name=None, flow_id=None, comment='', desc='', refresh=True, force=False)[source]
Adds a Process Group into NiFi Registry Version Control, or saves a new version to an existing VersionedFlow with a new version
- Parameters:
process_group (ProcessGroupEntity) – the ProcessGroup object to save as a new Flow Version
registry_client (RegistryClient) – The Client linked to the Registry which contains the Bucket to save to
bucket (Bucket) – the Bucket on the NiFi Registry to save to
flow_name (str) – A name for the VersionedFlow in the Bucket Note you need either a name for a new VersionedFlow, or the ID of an existing one to save a new version
flow_id (Optional [str]) – Identifier of an existing VersionedFlow in the bucket, if saving a new version to an existing flow
comment (str) – A comment for the version commit
desc (str) – A description of the VersionedFlow
refresh (bool) – Whether to refresh the object revisions before action
force (bool) – Whether to Force Commit, or just regular Commit
- Returns:
- nipyapi.versioning.update_flow_ver(process_group, target_version=None)[source]
Changes a versioned flow to the specified version, or the latest version
- Parameters:
process_group (ProcessGroupEntity) – ProcessGroupEntity under version control to change
target_version (Optional [None, Int]) – Either None to move to the
version (latest available)
to (or Int of the version number to move)
- Returns:
True if successful, False if not
- Return type:
(bool)
- nipyapi.versioning.update_registry_client(client, properties=None, description=None, refresh=True)[source]
Updates an existing Registry Client’s configuration.
This function merges provided properties with existing ones, allowing partial updates. Sensitive properties (e.g., tokens) are always applied if provided, since existing values cannot be inspected for comparison.
- Parameters:
client (FlowRegistryClientEntity) – The client to update
properties (dict, optional) – Properties to update. Merged with existing.
description (str, optional) – New description. If None, keeps existing.
refresh (bool) – Whether to refresh the object before action to get current revision. Defaults to True.
- Returns:
The updated client object
- Return type:
Example
>>> client = nipyapi.versioning.get_registry_client("GitHub-FlowRegistry") >>> updated = nipyapi.versioning.update_registry_client( ... client, ... properties={'Default Branch': 'feature-branch'} ... )