Versioning

Version control operations

For interactions with the NiFi Registry Service and related functions

nipyapi.versioning.create_flow(bucket_id, flow_name, flow_desc='', flow_type='Flow')[source]

Creates a new VersionedFlow stub in NiFi Registry. Can be used to write VersionedFlow information to without using a NiFi Process Group directly

Parameters:
  • bucket_id (str) – UUID of the Bucket to write to

  • flow_name (str) – Name for the new VersionedFlow object

  • flow_desc (Optional [str]) – Description for the new VersionedFlow object

  • flow_type (Optional [str]) – Type of the VersionedFlow, should be ‘Flow’

Returns:

(VersionedFlow)

nipyapi.versioning.create_flow_version(flow, flow_snapshot, refresh=True)[source]

EXPERIMENTAL

Writes a FlowSnapshot into a VersionedFlow as a new version update

Note that this differs from save_flow_ver which creates a new Flow Version containing the snapshot. This function writes a snapshot to an existing Flow Version. Useful in migrating Flow Versions between environments.

Parameters:
  • flow (VersionedFlowObject) – the VersionedFlow object to write to

  • flow_snapshot (VersionedFlowSnapshot) – the Snapshot to write into the VersionedFlow

  • refresh (bool) – Whether to refresh the object status before actioning

Returns:

The new (VersionedFlowSnapshot)

nipyapi.versioning.create_registry_bucket(name, description=None)[source]

Creates a new Registry Bucket

Parameters:
  • name (str) – name for the bucket, must be unique in the Registry

  • description (str, optional) – description for the bucket

Returns:

The new Bucket object

Return type:

Bucket

nipyapi.versioning.create_registry_client(name, uri, description, reg_type=None, ssl_context_service=None)[source]

Creates a Registry Client in the NiFi Controller Services

Parameters:
  • name (str) – The name of the new Client

  • uri (str) – The URI for the connection

  • description (str) – A description for the Client

  • reg_type (str) – The type of registry client to create. Defaults to ‘org.apache.nifi.registry.flow.NifiRegistryFlowRegistryClient’

  • ssl_context_service (ControllerServiceEntity) – Optional SSL Context Service

Returns:

The new registry client object

Return type:

FlowRegistryClientEntity

nipyapi.versioning.delete_registry_bucket(bucket)[source]

Removes a bucket from the NiFi Registry

Parameters:

bucket (Bucket) – the Bucket object to remove

Returns:

The updated Bucket object

Return type:

(Bucket)

nipyapi.versioning.delete_registry_client(client, refresh=True)[source]

Deletes a Registry Client from the list of NiFI Controller Services

Parameters:
Returns:

The updated client object

Return type:

(FlowRegistryClientEntity)

nipyapi.versioning.deploy_flow_version(parent_id, location, bucket_id, flow_id, reg_client_id, version=None)[source]

Deploys a versioned flow as a new process group inside the given parent process group. If version is not provided, the latest version will be deployed.

Parameters:
  • parent_id (str) – The ID of the parent Process Group to create the new process group in.

  • location (tuple[x, y]) – the x,y coordinates to place the new Process Group under the parent

  • bucket_id (str) – ID of the bucket containing the versioned flow to deploy.

  • reg_client_id (str) – ID of the registry client connection to use.

  • flow_id (str) – ID of the versioned flow to deploy.

  • version (Optional [int,str]) – version to deploy, if not provided latest version will be deployed.

Returns:

(ProcessGroupEntity) of the newly deployed Process Group

nipyapi.versioning.ensure_registry_bucket(name, description=None)[source]

Ensures a Registry Bucket exists, creating it if necessary.

This is a convenience function that implements the common pattern of: 1. Try to get existing bucket by name 2. If not found, create it 3. Handle race conditions gracefully

Parameters:
  • name (str) – name for the bucket, must be unique in the Registry

  • description (str, optional) – description for the bucket (only used if creating new)

Returns:

The bucket object (existing or new)

Return type:

(Bucket)

nipyapi.versioning.ensure_registry_client(name, uri, description, reg_type=None, ssl_context_service=None)[source]

Ensures a Registry Client exists, creating it if necessary.

This is a convenience function that implements the common pattern of: 1. Try to get existing client by name 2. If not found, create it 3. Handle race conditions gracefully

Parameters:
  • name (str) – The name of the Client

  • uri (str) – The URI for the connection

  • description (str) – A description for the Client

  • reg_type (str) – The type of registry client to create. Defaults to ‘org.apache.nifi.registry.flow.NifiRegistryFlowRegistryClient’

  • ssl_context_service (ControllerServiceEntity) – Optional SSL Context Service

Returns:

The registry client object (existing or new)

Return type:

(FlowRegistryClientEntity)

nipyapi.versioning.export_flow_version(bucket_id, flow_id, version=None, file_path=None, mode='json')[source]

Convenience method to export the identified VersionedFlowSnapshot in the provided format mode.

Parameters:
  • bucket_id (str) – the UUID of the bucket containing the Flow

  • flow_id (str) – the UUID of the Flow to be retrieved from the Bucket

  • version (Optional [None, Str]) – ‘None’ to retrieve the latest version, or a version number as a string to get that version

  • file_path (str) – The path and filename to write to. Defaults to None which returns the serialised obj

  • mode (str) – ‘json’ or ‘yaml’ to specific the encoding format

Returns:

(str) of the encoded Snapshot

nipyapi.versioning.get_flow_in_bucket(bucket_id, identifier, identifier_type='name', greedy=True)[source]

Filters the Flows in a Bucket against a particular identifier

Parameters:
  • bucket_id (str) – UUID of the Bucket to filter against

  • identifier (str) – The string to filter on

  • identifier_type (str) – The param to check

  • greedy (bool) – False for exact match, True for greedy match

Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.versioning.get_flow_version(bucket_id, flow_id, version=None, export=False)[source]

Retrieves the latest, or a specific, version of a Flow

Parameters:
  • bucket_id (str) – the UUID of the bucket containing the Flow

  • flow_id (str) – the UUID of the Flow to be retrieved from the Bucket

  • version (Optional [None, str]) – ‘None’ to retrieve the latest version, or a version number as a string to get that version

  • export (bool) – True to get the raw json object from the server for export, False to get the native DataType

Returns:

If export=False, or the raw json otherwise

Return type:

(VersionedFlowSnapshot)

WARNING: This call is impacted by https://issues.apache.org/jira/browse/NIFIREG-135 Which means you sometimes can’t trust the version count

nipyapi.versioning.get_latest_flow_ver(bucket_id, flow_id)[source]

Gets the most recent version of a VersionedFlowSnapshot from a bucket

Parameters:
  • bucket_id (str) – the UUID of the Bucket containing the flow

  • flow_id (str) – the UUID of the VersionedFlow to be retrieved

Returns:

(VersionedFlowSnapshot)

nipyapi.versioning.get_registry_bucket(identifier, identifier_type='name', greedy=True)[source]

Filters the Bucket list to a particular identifier

Parameters:
  • identifier (str) – the filter string

  • identifier_type (str) – the param to filter on

  • greedy (bool) – False for exact match, True for greedy match

Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.versioning.get_registry_client(identifier, identifier_type='name')[source]

Filters the Registry clients to a particular identifier

Parameters:
  • identifier (str) – the filter string

  • identifier_type (str) – the parameter to filter on

Returns:

None for no matches, Single Object for unique match, list(Objects) for multiple matches

nipyapi.versioning.get_version_info(process_group)[source]

Gets the Version Control information for a particular Process Group

Parameters:

process_group (ProcessGroupEntity) – the ProcessGroup to work with

Returns:

VersionControlInformationEntity

nipyapi.versioning.import_flow_version(bucket_id, encoded_flow=None, file_path=None, flow_name=None, flow_id=None)[source]

Imports a given encoded_flow version into the bucket and flow described, may optionally be passed a file to read the encoded flow_contents from.

Note that only one of encoded_flow or file_path, and only one of flow_name or flow_id should be specified.

Parameters:
  • bucket_id (str) – UUID of the bucket to write the encoded_flow version

  • encoded_flow (Optional [str]) – The encoded flow to import; if not specified file_path is read from.

  • file_path (Optional [str]) – The file path to read the encoded flow from , if not specified encoded_flow is read from.

  • flow_name (Optional [str]) – If this is to be the first version in a new flow object, then this is the String name for the flow object.

  • flow_id (Optional [str]) – If this is a new version for an existing flow object, then this is the ID of that object.

Returns:

The new (VersionedFlowSnapshot)

nipyapi.versioning.list_flow_versions(bucket_id, flow_id, registry_id=None, service='registry')[source]

EXPERIMENTAL List all the versions of a given Flow in a given Bucket

Parameters:
  • bucket_id (str) – UUID of the bucket holding the flow to be enumerated

  • flow_id (str) – UUID of the flow in the bucket to be enumerated

  • registry_id (str) – UUID of the registry client linking the bucket, only required if requesting flows via NiFi instead of directly Registry

  • service (str) – Accepts ‘nifi’ or ‘registry’, indicating which service to query

Returns:

list(VersionedFlowSnapshotMetadata) or

(VersionedFlowSnapshotMetadataSetEntity)

nipyapi.versioning.list_flows_in_bucket(bucket_id)[source]

List of all Flows in a given NiFi Registry Bucket

Parameters:

bucket_id (str) – The UUID of the Bucket to fetch from

Returns:

(list[VersionedFlow]) objects

nipyapi.versioning.list_registry_buckets()[source]

Lists all available Buckets in the NiFi Registry

Returns:

objects

Return type:

list[Bucket]

nipyapi.versioning.list_registry_clients()[source]

Lists the available Registry Clients in the NiFi Controller Services

Returns:

objects

Return type:

list[FlowRegistryClientEntity]

nipyapi.versioning.save_flow_ver(process_group, registry_client, bucket, flow_name=None, flow_id=None, comment='', desc='', refresh=True, force=False)[source]

Adds a Process Group into NiFi Registry Version Control, or saves a new version to an existing VersionedFlow with a new version

Parameters:
  • process_group (ProcessGroupEntity) – the ProcessGroup object to save as a new Flow Version

  • registry_client (RegistryClient) – The Client linked to the Registry which contains the Bucket to save to

  • bucket (Bucket) – the Bucket on the NiFi Registry to save to

  • flow_name (str) – A name for the VersionedFlow in the Bucket Note you need either a name for a new VersionedFlow, or the ID of an existing one to save a new version

  • flow_id (Optional [str]) – Identifier of an existing VersionedFlow in the bucket, if saving a new version to an existing flow

  • comment (str) – A comment for the version commit

  • desc (str) – A description of the VersionedFlow

  • refresh (bool) – Whether to refresh the object revisions before action

  • force (bool) – Whether to Force Commit, or just regular Commit

Returns:

VersionControlInformationEntity

nipyapi.versioning.update_flow_ver(process_group, target_version=None)[source]

Changes a versioned flow to the specified version, or the latest version

Parameters:
  • process_group (ProcessGroupEntity) – ProcessGroupEntity under version control to change

  • target_version (Optional [None, Int]) – Either None to move to the

  • version (latest available)

  • to (or Int of the version number to move)

Returns:

True if successful, False if not

Return type:

(bool)