Versioning

Version control operations

For interactions with the NiFi Registry Service and related functions

nipyapi.versioning.create_flow(bucket_id, flow_name, flow_desc='', flow_type='Flow')[source]

Creates a new VersionedFlow stub in NiFi Registry. Can be used to write VersionedFlow information to without using a NiFi Process Group directly

Parameters:
  • bucket_id (str) – UUID of the Bucket to write to

  • flow_name (str) – Name for the new VersionedFlow object

  • flow_desc (Optional [str]) – Description for the new VersionedFlow object

  • flow_type (Optional [str]) – Type of the VersionedFlow, should be ‘Flow’

Returns:

(VersionedFlow)

nipyapi.versioning.create_flow_version(flow, flow_snapshot, refresh=True)[source]

EXPERIMENTAL

Writes a FlowSnapshot into a VersionedFlow as a new version update

Note that this differs from save_flow_ver which creates a new Flow Version containing the snapshot. This function writes a snapshot to an existing Flow Version. Useful in migrating Flow Versions between environments.

Parameters:
  • flow (VersionedFlowObject) – the VersionedFlow object to write to

  • flow_snapshot (VersionedFlowSnapshot) – the Snapshot to write into the VersionedFlow

  • refresh (bool) – Whether to refresh the object status before actioning

Returns:

The new (VersionedFlowSnapshot)

nipyapi.versioning.create_registry_bucket(name, description=None)[source]

Creates a new Registry Bucket

Parameters:
  • name (str) – name for the bucket, must be unique in the Registry

  • description (str, optional) – description for the bucket

Returns:

The new Bucket object

Return type:

Bucket

nipyapi.versioning.create_registry_client(name, uri=None, description='', reg_type=None, properties=None, ssl_context_service=None)[source]

Creates a Registry Client in the NiFi Controller Services

Parameters:
  • name (str) – The name of the new Client

  • uri (str, optional) – The URI for the connection. Required for NiFi Registry. Ignored for Git-based registries. Defaults to None.

  • description (str, optional) – A description for the Client. Defaults to empty string.

  • reg_type (str, optional) – The type of registry client to create. Defaults to ‘org.apache.nifi.registry.flow.NifiRegistryFlowRegistryClient’. Other options include: - ‘org.apache.nifi.github.GitHubFlowRegistryClient’ - ‘org.apache.nifi.gitlab.GitLabFlowRegistryClient’ - ‘org.apache.nifi.atlassian.bitbucket.BitbucketFlowRegistryClient’ - ‘org.apache.nifi.azure.devops.AzureDevOpsFlowRegistryClient’

  • properties (dict, optional) – Properties to configure the client. If provided, these are used directly. If not provided, defaults are used based on reg_type. For NiFi Registry, defaults to {‘url’: uri} if uri is provided. For Git-based registries, starts empty (must be configured).

  • ssl_context_service (ControllerServiceEntity, optional) – SSL Context Service (only applicable for NiFi Registry type). Defaults to None.

Returns:

The new registry client object

Return type:

FlowRegistryClientEntity

Example

>>> # NiFi Registry client
>>> nifi_reg = nipyapi.versioning.create_registry_client(
...     name='my-registry',
...     uri='http://localhost:18080',
...     description='My NiFi Registry'
... )
>>> # GitHub client with properties
>>> github_client = nipyapi.versioning.create_registry_client(
...     name='github-reg',
...     reg_type='org.apache.nifi.github.GitHubFlowRegistryClient',
...     description='GitHub Registry',
...     properties={
...         'Repository Owner': 'myorg',
...         'Repository Name': 'myrepo',
...         'Authentication Type': 'PERSONAL_ACCESS_TOKEN',
...         'Personal Access Token': 'ghp_xxx...',
...         'Default Branch': 'main'
...     }
... )
nipyapi.versioning.delete_registry_bucket(bucket)[source]

Removes a bucket from the NiFi Registry

Parameters:

bucket (Bucket) – the Bucket object to remove

Returns:

The updated Bucket object

Return type:

(Bucket)

nipyapi.versioning.delete_registry_client(client, refresh=True)[source]

Deletes a Registry Client from the list of NiFI Controller Services

Parameters:
Returns:

The updated client object

Return type:

(FlowRegistryClientEntity)

nipyapi.versioning.deploy_flow_version(parent_id, location, bucket_id, flow_id, reg_client_id, version=None)[source]

Deploys a versioned flow as a new process group inside the given parent process group. If version is not provided, the latest version will be deployed.

Parameters:
  • parent_id (str) – The ID of the parent Process Group to create the new process group in.

  • location (tuple[x, y]) – the x,y coordinates to place the new Process Group under the parent

  • bucket_id (str) – ID of the bucket containing the versioned flow to deploy.

  • reg_client_id (str) – ID of the registry client connection to use.

  • flow_id (str) – ID of the versioned flow to deploy.

  • version (Optional [int,str]) – version to deploy, if not provided latest version will be deployed.

Returns:

(ProcessGroupEntity) of the newly deployed Process Group

nipyapi.versioning.deploy_git_registry_flow(registry_client_id, bucket_id, flow_id, parent_id, location=None, version=None, branch=None)[source]

Deploy a flow from a Git-based registry to the NiFi canvas.

Creates a new Process Group under the specified parent, linked to version control from the Git-based registry. The deployed flow will track the specified version (or latest) from the repository.

Parameters:
  • registry_client_id (str) – The ID of the Git-based registry client.

  • bucket_id (str) – The bucket name (folder name) containing the flow.

  • flow_id (str) – The flow name (filename without .json).

  • parent_id (str) – The ID of the parent Process Group to deploy into.

  • location (tuple, optional) – (x, y) coordinates for placement. Defaults to (0, 0).

  • version (str, optional) – Specific version (commit hash) to deploy. If None, deploys the latest version.

  • branch (str, optional) – The branch to deploy from. If None, uses the registry client’s configured default branch.

Returns:

The newly deployed

Process Group.

Return type:

ProcessGroupEntity

Example

>>> client = nipyapi.versioning.get_registry_client('my-github-client')
>>> root_id = nipyapi.canvas.get_root_pg_id()
>>> pg = nipyapi.versioning.deploy_git_registry_flow(
...     client.id, 'flows', 'http-responder', root_id
... )
nipyapi.versioning.ensure_registry_bucket(name, description=None)[source]

Ensures a Registry Bucket exists, creating it if necessary.

This is a convenience function that implements the common pattern of: 1. Try to get existing bucket by name 2. If not found, create it 3. Handle race conditions gracefully

Parameters:
  • name (str) – name for the bucket, must be unique in the Registry

  • description (str, optional) – description for the bucket (only used if creating new)

Returns:

The bucket object (existing or new)

Return type:

(Bucket)

nipyapi.versioning.ensure_registry_client(name, uri=None, description='', reg_type=None, properties=None, ssl_context_service=None)[source]

Ensures a Registry Client exists with the desired configuration.

This is a convenience function that implements the common pattern of: 1. Try to get existing client by name 2. If found and properties provided, update the client 3. If not found, create it 4. Handle race conditions gracefully

For Git-based registries (GitHub, GitLab, etc.), if properties are provided and client exists, the client will be updated. This supports CI/CD workflows where branch or credentials may change between deployments. Sensitive properties (like tokens) are always applied if provided since existing values cannot be inspected.

Parameters:
  • name (str) – The name of the Client

  • uri (str, optional) – The URI for the connection. Required for NiFi Registry. Ignored for Git-based registries. Defaults to None.

  • description (str, optional) – A description for the Client. Defaults to empty string.

  • reg_type (str, optional) – The type of registry client to create. Defaults to ‘org.apache.nifi.registry.flow.NifiRegistryFlowRegistryClient’

  • properties (dict, optional) – Properties to configure the client. If provided and client exists, will update the existing client with these properties.

  • ssl_context_service (ControllerServiceEntity, optional) – SSL Context Service

Returns:

The registry client object (existing, updated, or new)

Return type:

(FlowRegistryClientEntity)

Note

For NiFi Registry clients, URI mismatch triggers recreation. For Git-based clients, properties trigger an update (not recreation).

nipyapi.versioning.export_flow_version(bucket_id, flow_id, version=None, file_path=None, mode='json')[source]

Convenience method to export the identified VersionedFlowSnapshot in the provided format mode.

Parameters:
  • bucket_id (str) – the UUID of the bucket containing the Flow

  • flow_id (str) – the UUID of the Flow to be retrieved from the Bucket

  • version (Optional [None, Str]) – ‘None’ to retrieve the latest version, or a version number as a string to get that version

  • file_path (str) – The path and filename to write to. Defaults to None which returns the serialised obj

  • mode (str) – ‘json’ or ‘yaml’ to specific the encoding format

Returns:

(str) of the encoded Snapshot

nipyapi.versioning.export_process_group_definition(process_group, file_path=None, mode='json')[source]

Export a process group as a flow definition (NiFi 2.x format). Does NOT require NiFi Registry - exports the current state of the process group.

Parameters:
  • process_group (ProcessGroupEntity) – The process group to export

  • file_path (str, optional) – Path to write the export to. If None, returns the serialized string

  • mode (str) – Export format - ‘json’ or ‘yaml’. Defaults to ‘json’

Returns:

The serialized flow definition if file_path is None, otherwise

the path written to

Return type:

str

Example

>>> pg = nipyapi.canvas.get_process_group('my-flow')
>>> nipyapi.versioning.export_process_group_definition(
...     pg, file_path='my-flow.json', mode='json'
... )
'my-flow.json'
nipyapi.versioning.get_flow_in_bucket(bucket_id, identifier, identifier_type='name', greedy=True)[source]

Filters the Flows in a Bucket against a particular identifier

Parameters:
  • bucket_id (str) – UUID of the Bucket to filter against

  • identifier (str) – The string to filter on

  • identifier_type (str) – The param to check

  • greedy (bool) – False for exact match, True for greedy match

Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.versioning.get_flow_version(bucket_id, flow_id, version=None, export=False)[source]

Retrieves the latest, or a specific, version of a Flow

Parameters:
  • bucket_id (str) – the UUID of the bucket containing the Flow

  • flow_id (str) – the UUID of the Flow to be retrieved from the Bucket

  • version (Optional [None, str]) – ‘None’ to retrieve the latest version, or a version number as a string to get that version

  • export (bool) – True to get the raw json object from the server for export, False to get the native DataType

Returns:

If export=False, or the raw json otherwise

Return type:

(VersionedFlowSnapshot)

WARNING: This call is impacted by https://issues.apache.org/jira/browse/NIFIREG-135 Which means you sometimes can’t trust the version count

nipyapi.versioning.get_git_registry_bucket(registry_client_id, identifier, greedy=True, branch=None)[source]

Filters the bucket list from a Git-based registry client.

Parameters:
  • registry_client_id (str) – The ID of the Git-based registry client.

  • identifier (str) – The bucket name (folder name in the repository).

  • greedy (bool) – False for exact match, True for greedy/partial match.

  • branch (str, optional) – The branch to query. If None, uses the registry client’s configured default branch.

Returns:

None for no matches, single object for unique match, list of objects for multiple matches.

Example

>>> client = nipyapi.versioning.get_registry_client('my-github-client')
>>> bucket = nipyapi.versioning.get_git_registry_bucket(
...     client.id, 'flows', greedy=False
... )
nipyapi.versioning.get_git_registry_flow(registry_client_id, bucket_id, identifier, greedy=True, branch=None)[source]

Filters the flow list in a bucket from a Git-based registry client.

Parameters:
  • registry_client_id (str) – The ID of the Git-based registry client.

  • bucket_id (str) – The bucket name (folder name) containing the flows.

  • identifier (str) – The flow name (filename without .json).

  • greedy (bool) – False for exact match, True for greedy/partial match.

  • branch (str, optional) – The branch to query. If None, uses the registry client’s configured default branch.

Returns:

None for no matches, single object for unique match, list of objects for multiple matches.

Example

>>> client = nipyapi.versioning.get_registry_client('my-github-client')
>>> flow = nipyapi.versioning.get_git_registry_flow(
...     client.id, 'flows', 'http-responder', greedy=False
... )
nipyapi.versioning.get_latest_flow_ver(bucket_id, flow_id)[source]

Gets the most recent version of a VersionedFlowSnapshot from a bucket

Parameters:
  • bucket_id (str) – the UUID of the Bucket containing the flow

  • flow_id (str) – the UUID of the VersionedFlow to be retrieved

Returns:

(VersionedFlowSnapshot)

nipyapi.versioning.get_registry_bucket(identifier, identifier_type='name', greedy=True)[source]

Filters the Bucket list to a particular identifier

Parameters:
  • identifier (str) – the filter string

  • identifier_type (str) – the param to filter on

  • greedy (bool) – False for exact match, True for greedy match

Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.versioning.get_registry_client(identifier, identifier_type='name')[source]

Filters the Registry clients to a particular identifier

Parameters:
  • identifier (str) – the filter string

  • identifier_type (str) – the parameter to filter on

Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.versioning.get_version_info(process_group)[source]

Gets the Version Control information for a particular Process Group

Parameters:

process_group (ProcessGroupEntity) – the ProcessGroup to work with

Returns:

VersionControlInformationEntity

nipyapi.versioning.import_flow_version(bucket_id, encoded_flow=None, file_path=None, flow_name=None, flow_id=None)[source]

Imports a given encoded_flow version into the bucket and flow described, may optionally be passed a file to read the encoded flow_contents from.

Note that only one of encoded_flow or file_path, and only one of flow_name or flow_id should be specified.

Parameters:
  • bucket_id (str) – UUID of the bucket to write the encoded_flow version

  • encoded_flow (Optional [str]) – The encoded flow to import; if not specified file_path is read from.

  • file_path (Optional [str]) – The file path to read the encoded flow from , if not specified encoded_flow is read from.

  • flow_name (Optional [str]) – If this is to be the first version in a new flow object, then this is the String name for the flow object.

  • flow_id (Optional [str]) – If this is a new version for an existing flow object, then this is the ID of that object.

Returns:

The new (VersionedFlowSnapshot)

nipyapi.versioning.import_process_group_definition(parent_pg, flow_definition=None, file_path=None, position=None)[source]

Import a flow definition as a new process group (NiFi 2.x format). Does NOT require NiFi Registry - imports from flow definition JSON/YAML.

Parameters:
  • parent_pg (ProcessGroupEntity) – Parent process group to import into

  • flow_definition (str, optional) – Flow definition as JSON or YAML string. Either this or file_path must be provided, but not both

  • file_path (str, optional) – Path to flow definition file to import. Either this or flow_definition must be provided, but not both

  • position (tuple, optional) – (x, y) coordinates for the new process group. Defaults to (0, 0)

Returns:

The newly imported process group

Return type:

ProcessGroupEntity

Example

>>> root_pg = nipyapi.canvas.get_process_group(
...     nipyapi.canvas.get_root_pg_id(), 'id'
... )
>>> imported_pg = nipyapi.versioning.import_process_group_definition(
...     parent_pg=root_pg,
...     file_path='my-flow.json',
...     position=(100, 100)
... )
nipyapi.versioning.list_flow_versions(bucket_id, flow_id, registry_id=None, service='registry')[source]

EXPERIMENTAL List all the versions of a given Flow in a given Bucket

Parameters:
  • bucket_id (str) – UUID of the bucket holding the flow to be enumerated

  • flow_id (str) – UUID of the flow in the bucket to be enumerated

  • registry_id (str) – UUID of the registry client linking the bucket, only required if requesting flows via NiFi instead of directly Registry

  • service (str) – Accepts ‘nifi’ or ‘registry’, indicating which service to query

Returns:

list(VersionedFlowSnapshotMetadata) or

(VersionedFlowSnapshotMetadataSetEntity)

nipyapi.versioning.list_flows_in_bucket(bucket_id)[source]

List of all Flows in a given NiFi Registry Bucket

Parameters:

bucket_id (str) – The UUID of the Bucket to fetch from

Returns:

(list[VersionedFlow]) objects

nipyapi.versioning.list_git_registry_buckets(registry_client_id, branch=None)[source]

List buckets (folders) from a Git-based registry client.

This function queries a Git-based Flow Registry Client (GitHub, GitLab, Bitbucket, or Azure DevOps) via the NiFi FlowApi to list available buckets. In Git-based registries, buckets correspond to folders in the repository.

Parameters:
  • registry_client_id (str) – The ID of the Git-based registry client.

  • branch (str, optional) – The branch to query. If None, uses the registry client’s configured default branch.

Returns:

FlowRegistryBucketsEntity

Example

>>> client = nipyapi.versioning.get_registry_client('my-github-client')
>>> buckets = nipyapi.versioning.list_git_registry_buckets(client.id)
>>> for b in buckets.buckets:
...     print(f"{b.id}: {b.bucket.name}")
nipyapi.versioning.list_git_registry_flow_versions(registry_client_id, bucket_id, flow_id, branch=None)[source]

List all versions of a flow from a Git-based registry client.

Parameters:
  • registry_client_id (str) – The ID of the Git-based registry client.

  • bucket_id (str) – The bucket name (folder name) containing the flow.

  • flow_id (str) – The flow name (filename without .json) to list versions.

  • branch (str, optional) – The branch to query. If None, uses the registry client’s configured default branch.

Returns:

VersionedFlowSnapshotMetadataSetEntity

Example

>>> client = nipyapi.versioning.get_registry_client('my-github-client')
>>> versions = nipyapi.versioning.list_git_registry_flow_versions(
...     client.id, 'flows', 'http-responder'
... )
nipyapi.versioning.list_git_registry_flows(registry_client_id, bucket_id, branch=None)[source]

List flows in a bucket from a Git-based registry client.

This function queries a Git-based Flow Registry Client to list available flows within a specific bucket. In Git-based registries, flows are JSON files within the bucket folder (e.g., flows/my-flow.json).

Parameters:
  • registry_client_id (str) – The ID of the Git-based registry client.

  • bucket_id (str) – The bucket ID (folder name) to list flows from.

  • branch (str, optional) – The branch to query. If None, uses the registry client’s configured default branch.

Returns:

VersionedFlowsEntity

Example

>>> client = nipyapi.versioning.get_registry_client('my-github-client')
>>> flows = nipyapi.versioning.list_git_registry_flows(client.id, 'flows')
>>> for f in flows.versioned_flows:
...     print(f.versioned_flow.flow_id)
nipyapi.versioning.list_registry_buckets()[source]

Lists all available Buckets in the NiFi Registry

Returns:

objects

Return type:

list[Bucket]

nipyapi.versioning.list_registry_client_types()[source]

Lists all available Flow Registry Client types in the NiFi instance.

This includes built-in registry types like: - NiFi Registry (org.apache.nifi.registry.flow.NifiRegistryFlowRegistryClient) - GitHub (org.apache.nifi.github.GitHubFlowRegistryClient) - GitLab (org.apache.nifi.gitlab.GitLabFlowRegistryClient) - Bitbucket (org.apache.nifi.atlassian.bitbucket.BitbucketFlowRegistryClient) - Azure DevOps (org.apache.nifi.azure.devops.AzureDevOpsFlowRegistryClient)

Returns:

List of available

registry client types with their properties and descriptions

Return type:

list[DocumentedTypeDTO]

Example

>>> types = nipyapi.versioning.list_registry_client_types()
>>> github_type = [t for t in types if 'GitHub' in t.type][0]
>>> print(github_type.type)
org.apache.nifi.github.GitHubFlowRegistryClient
nipyapi.versioning.list_registry_clients()[source]

Lists the available Registry Clients in the NiFi Controller Services

Returns:

objects

Return type:

list[FlowRegistryClientEntity]

nipyapi.versioning.save_flow_ver(process_group, registry_client, bucket, flow_name=None, flow_id=None, comment='', desc='', refresh=True, force=False)[source]

Adds a Process Group into NiFi Registry Version Control, or saves a new version to an existing VersionedFlow with a new version

Parameters:
  • process_group (ProcessGroupEntity) – the ProcessGroup object to save as a new Flow Version

  • registry_client (RegistryClient) – The Client linked to the Registry which contains the Bucket to save to

  • bucket (Bucket) – the Bucket on the NiFi Registry to save to

  • flow_name (str) – A name for the VersionedFlow in the Bucket Note you need either a name for a new VersionedFlow, or the ID of an existing one to save a new version

  • flow_id (Optional [str]) – Identifier of an existing VersionedFlow in the bucket, if saving a new version to an existing flow

  • comment (str) – A comment for the version commit

  • desc (str) – A description of the VersionedFlow

  • refresh (bool) – Whether to refresh the object revisions before action

  • force (bool) – Whether to Force Commit, or just regular Commit

Returns:

VersionControlInformationEntity

nipyapi.versioning.update_flow_ver(process_group, target_version=None)[source]

Changes a versioned flow to the specified version, or the latest version

Parameters:
  • process_group (ProcessGroupEntity) – ProcessGroupEntity under version control to change

  • target_version (Optional [None, Int]) – Either None to move to the

  • version (latest available)

  • to (or Int of the version number to move)

Returns:

True if successful, False if not

Return type:

(bool)

nipyapi.versioning.update_registry_client(client, properties=None, description=None, refresh=True)[source]

Updates an existing Registry Client’s configuration.

This function merges provided properties with existing ones, allowing partial updates. Sensitive properties (e.g., tokens) are always applied if provided, since existing values cannot be inspected for comparison.

Parameters:
  • client (FlowRegistryClientEntity) – The client to update

  • properties (dict, optional) – Properties to update. Merged with existing.

  • description (str, optional) – New description. If None, keeps existing.

  • refresh (bool) – Whether to refresh the object before action to get current revision. Defaults to True.

Returns:

The updated client object

Return type:

(FlowRegistryClientEntity)

Example

>>> client = nipyapi.versioning.get_registry_client("GitHub-FlowRegistry")
>>> updated = nipyapi.versioning.update_registry_client(
...     client,
...     properties={'Default Branch': 'feature-branch'}
... )