CI Operations
The nipyapi.ci module provides high-level operations designed for CI/CD pipelines. These functions wrap nipyapi operations with environment variable support, sensible defaults, and simplified interfaces.
Overview
CI operations are designed for automation scenarios:
Environment variable support: All parameters can be set via environment variables
Sensible defaults: Minimal configuration required for common use cases
Structured output: Returns plain dicts suitable for CI artifact formats
Error handling: Raises exceptions with clear error messages
Quick Start
# 1. Configure NiFi connection
export NIFI_API_ENDPOINT="https://nifi.example.com/nifi-api"
export NIFI_BEARER_TOKEN="your-jwt-token"
# 2. Set up registry client
export GH_REGISTRY_TOKEN="ghp_xxxx"
nipyapi ci ensure_registry --repo owner/repo
# 3. Deploy a flow
nipyapi ci deploy_flow --bucket connectors --flow postgresql
# 4. Start the flow
nipyapi ci start_flow --process_group_id $PROCESS_GROUP_ID
Available Operations
ensure_registry
Create or update a Git Flow Registry Client (GitHub or GitLab).
nipyapi ci ensure_registry \
--token TOKEN \
--repo owner/repo \
--client_name "MyRegistryClient" \
--provider github \
--default_branch main
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
Personal Access Token |
|
|
Repository in owner/repo format |
|
|
Registry client name |
|
|
Git provider (github/gitlab) |
|
|
API URL override |
|
|
Default branch (default: main) |
|
|
Path within repository |
|
Returns: registry_client_id, registry_client_name
deploy_flow
Deploy a flow from a Git-based registry to the NiFi canvas.
nipyapi ci deploy_flow \
--registry_client_id REGISTRY_ID \
--bucket connectors \
--flow postgresql \
--branch main \
--version v1.0.0
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the registry client |
|
|
Bucket (folder) containing the flow |
|
|
Flow name (filename without .json) |
|
|
Parent Process Group ID (default: root) |
|
|
Branch to deploy from |
|
|
Version (commit SHA, tag, branch) |
|
|
(x, y) tuple for canvas placement |
|
Returns: process_group_id, process_group_name, deployed_version
start_flow
Start a process group (enable controllers, start processors).
nipyapi ci start_flow --process_group_id PG_ID
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the process group |
|
|
Enable controller services first (default: True) |
(none) |
Returns: started, process_group_name
stop_flow
Stop a process group (stop processors, optionally disable controllers).
nipyapi ci stop_flow --process_group_id PG_ID
nipyapi ci stop_flow --process_group_id PG_ID --disable_controllers
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the process group |
|
|
Also disable controller services |
|
Returns: stopped, process_group_name, controllers_disabled
get_status
Get comprehensive status information for a process group.
nipyapi ci get_status --process_group_id PG_ID
nipyapi ci get_status # Uses root process group
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the process group (default: root) |
|
Returns: Comprehensive status including:
process_group_id,process_group_name,state,is_rootProcessor counts:
total_processors,running_processors,stopped_processors,invalid_processors,disabled_processorsController counts:
total_controllers,enabled_controllers,disabled_controllersQueue stats:
queued_flowfiles,queued_bytes,active_threadsVersion control:
versioned,version_id,flow_id,version_state,modifiedParameter context:
has_parameter_context,parameter_context_id,parameter_countBulletins:
bulletin_warnings,bulletin_errors,bulletin_messages
configure_params
Configure parameters on a process group’s parameter context.
nipyapi ci configure_params \
--process_group_id PG_ID \
--parameters '{"db_host": "localhost", "db_port": "5432"}'
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the process group |
|
|
JSON string of parameter name-value pairs |
|
Returns: parameters_updated, parameters_count, context_name
Note: This function only updates parameters in the directly attached parameter context.
If your flow uses parameter context inheritance, use configure_inherited_params instead.
configure_inherited_params
Configure parameters across an inheritance hierarchy, routing each parameter to its owning context.
This function is inheritance-aware: it determines which parameter context owns each parameter and updates it in the correct context, preventing accidental shadowing. It also supports dry-run mode to preview changes before applying them.
# Dry run first (always recommended)
nipyapi ci configure_inherited_params \
--process_group_id PG_ID \
--parameters '{"Source Username": "myuser", "Snowflake Warehouse": "my_wh"}' \
--dry_run
# Execute after confirming the plan
nipyapi ci configure_inherited_params \
--process_group_id PG_ID \
--parameters '{"Source Username": "myuser", "Snowflake Warehouse": "my_wh"}'
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the process group |
|
|
JSON string of parameter name-value pairs |
|
|
Preview changes without applying (default: false) |
|
|
Create parameters at top level if not found |
|
Returns:
dry_run: Whether this was a dry runplan: Summary of planned/executed updates (e.g.,Param1→Context1 | Param2→Context2)parameters_updated: Count of parameters updated (0 if dry run)contexts_modified: Count of contexts modified (0 if dry run)warnings: Any warnings (e.g., asset replacement)errors: Any errors (e.g., parameter not found)
When to use this vs configure_params:
Use
configure_paramsfor simple flows with a single parameter contextUse
configure_inherited_paramswhen your flow has inherited parameter contexts (common with connectors that separate source, destination, and flow-specific parameters)
change_flow_version
Change the version of a deployed flow.
nipyapi ci change_flow_version --process_group_id PG_ID --target_version v2.0.0
nipyapi ci change_flow_version --process_group_id PG_ID # Changes to latest
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the process group |
|
|
Version (commit SHA, tag, branch) |
|
|
Branch to use |
|
|
Git token for resolving tags |
|
|
Repository in owner/repo format |
|
|
Git provider (github/gitlab) |
|
Returns: previous_version, new_version, version_state
commit_flow
Commit a flow to version control. For new flows, starts version control by saving the initial version. For existing versioned flows, saves a new version.
# Initial commit (requires registry and bucket)
nipyapi ci commit_flow --process_group_id PG_ID --registry_client MyRegistry --bucket flows --comment "Initial commit"
# Subsequent commit (uses existing version control info)
nipyapi ci commit_flow --process_group_id PG_ID --comment "Updated parameters"
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the process group |
|
|
Commit message |
|
|
Registry client name (initial commit only) |
|
|
Bucket/folder name (initial commit only) |
|
|
Flow name in registry (defaults to PG name) |
|
|
Use FORCE_COMMIT for conflicts |
Returns: flow_id, version, state, initial_commit
detach_flow
Detach a flow from version control. This removes version control from a process group, allowing it to be re-versioned to a different registry.
Use this as the first step in forking a read-only flow to your own repository.
# Fork workflow: detach from read-only, save to your own registry
nipyapi ci detach_flow --process_group_id PG_ID
nipyapi ci commit_flow --process_group_id PG_ID --registry_client MyRepo --bucket flows
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the process group |
|
Returns: detached, process_group_name, previous_registry, previous_flow_id, previous_bucket
get_flow_diff
Get local (uncommitted) modifications to a versioned flow.
Use this before upgrading to capture changes that need to be re-applied after upgrade.
nipyapi ci get_flow_diff --process_group_id PG_ID
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the versioned process group |
|
Returns:
process_group_id,process_group_name: Process group identificationflow_id: Flow ID in registrycurrent_version: Currently committed versionstate: Version control state (LOCALLY_MODIFIED, UP_TO_DATE, etc.)modification_count: Number of modified componentsmodifications: List of modifications, each containing: -component_id,component_name,component_type-changes: List of changes withtypeanddescription
get_flow_versions
Get version history for a versioned flow.
nipyapi ci get_flow_versions --process_group_id PG_ID
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the versioned process group |
|
Returns:
flow_id: Flow identifier in registrybucket_id: Bucket containing the flowregistry_id: Registry client IDcurrent_version: Currently deployed versionstate: Version control stateversion_count: Number of versionsversions: List of version metadata (version, author, comments, timestamp, branch)
revert_flow
Revert uncommitted local changes to a flow.
nipyapi ci revert_flow --process_group_id PG_ID
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the process group |
|
Returns: reverted, process_group_name, version
cleanup
Stop and optionally delete a process group.
# Just stop (safest)
nipyapi ci cleanup --process_group_id PG_ID --stop_only
# Stop and delete process group
nipyapi ci cleanup --process_group_id PG_ID
# Full cleanup including parameter context
nipyapi ci cleanup --process_group_id PG_ID --delete_parameter_context --force
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
ID of the process group |
|
|
Only stop, don’t delete |
|
|
Force deletion with queued FlowFiles |
|
|
Also delete the parameter context |
|
|
Disable controllers (default: true) |
|
Returns: stopped, deleted, process_group_name, parameter_context_deleted
list_flows
List version control state for all process groups under a parent.
This function scans process groups and reports their version control status, making it easy to identify which flows need updates, have local modifications, or are out of sync.
# List immediate child process groups from root
nipyapi ci list_flows
# List all process groups recursively
nipyapi ci list_flows --descendants
# List process groups under a specific parent
nipyapi ci list_flows --process_group_id PG_ID
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
Parent PG to search from (default: root) |
|
|
Include all nested PGs recursively |
|
Returns:
parent_id,parent_name: The parent process group searched fromtotal_count: Total number of process groups foundversioned_count: Number of version-controlled process groupsstale_count: Number with available updatesmodified_count: Number with local modificationsprocess_groups: List of dicts with version info for each PG: -id,name: Process group identification -versioned: Whether under version control -current_version: Current version (or empty) -state: Version state (UP_TO_DATE, STALE, LOCALLY_MODIFIED, etc.) -modified: Whether local changes exist -flow_name,bucket_name: Registry flow info
purge_flowfiles
Purge queued FlowFiles from a process group.
nipyapi ci purge_flowfiles --process_group_id PG_ID
Returns: purged, process_group_name
upload_asset
Upload an asset file to a process group.
nipyapi ci upload_asset --process_group_id PG_ID --file_path /path/to/asset.jar
Returns: uploaded, asset_id, asset_name
resolve_git_ref
Resolve a git reference (tag, branch, or partial SHA) to a full commit SHA.
This utility function is used internally by change_flow_version and deploy_flow, but can
also be called directly when you need to resolve a git ref before passing it to other operations.
# Resolve a tag to SHA
nipyapi ci resolve_git_ref --ref v1.0.0 --repo owner/repo --token $GH_TOKEN
# Resolve a branch to SHA
nipyapi ci resolve_git_ref --ref main --repo owner/repo --token $GH_TOKEN --provider github
# Already a SHA - returns as-is (no API call needed)
nipyapi ci resolve_git_ref --ref abc123def456
Parameters:
Parameter |
Description |
Environment Variable |
|---|---|---|
|
Tag name, branch name, or commit SHA |
(none) |
|
Repository in owner/repo format |
|
|
Personal access token for API access |
|
|
Git provider: github/gitlab (default: github) |
|
Returns: The resolved commit SHA, or None if ref was empty.
Notes:
If the ref already looks like a SHA (7-40 hex characters), it’s returned as-is without an API call
Useful for CI/CD pipelines that need to pin to exact commits
Called automatically by
change_flow_versionwhen you pass a tag or branch name
Environment Variable Reference
Connection
Variable |
Description |
|---|---|
|
NiFi API URL |
|
JWT bearer token |
|
Basic auth username |
|
Basic auth password |
|
SSL verification (default: true) |
Registry
Variable |
Description |
|---|---|
|
GitHub Personal Access Token |
|
GitLab Personal Access Token |
|
Repository in owner/repo format |
|
Registry client ID |
|
Registry client name |
|
Git provider (github/gitlab) |
|
API URL override |
|
Default branch |
|
Path within repository |
Flow Operations
Variable |
Description |
|---|---|
|
Process group ID |
|
Parent process group ID |
|
Bucket (folder) name |
|
Flow name |
|
Branch for deployment |
|
Target version (SHA, tag, branch) |
|
Canvas X coordinate |
|
Canvas Y coordinate |
|
JSON parameter values |
Cleanup
Variable |
Description |
|---|---|
|
Only stop, don’t delete (true/false) |
|
Force deletion (true/false) |
|
Delete parameter context (true/false) |
|
Disable controllers (true/false) |
Complete Workflow Example
Shell Script
#!/bin/bash
set -e
# Configuration
export NIFI_API_ENDPOINT="https://nifi.example.com/nifi-api"
export NIFI_BEARER_TOKEN="$NIFI_TOKEN"
export GH_REGISTRY_TOKEN="$GITHUB_TOKEN"
# 1. Ensure registry client exists
RESULT=$(nipyapi ci ensure_registry --repo myorg/nifi-flows)
REGISTRY_ID=$(echo "$RESULT" | jq -r '.registry_client_id')
echo "Registry client: $REGISTRY_ID"
# 2. Deploy the flow
RESULT=$(nipyapi ci deploy_flow \
--registry_client_id "$REGISTRY_ID" \
--bucket connectors \
--flow postgresql \
--version "$FLOW_VERSION")
PG_ID=$(echo "$RESULT" | jq -r '.process_group_id')
echo "Deployed process group: $PG_ID"
# 3. Configure parameters
nipyapi ci configure_params \
--process_group_id "$PG_ID" \
--parameters '{"db_host": "'"$DB_HOST"'", "db_password": "'"$DB_PASSWORD"'"}'
# 4. Start the flow
nipyapi ci start_flow --process_group_id "$PG_ID"
# 5. Verify status
nipyapi ci get_status --process_group_id "$PG_ID"
GitHub Actions
For GitHub Actions integration, use the nipyapi-actions repository which provides reusable actions:
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Deploy NiFi Flow
uses: Chaffelson/nipyapi-actions@main
with:
operation: deploy-flow
nifi-api-endpoint: ${{ secrets.NIFI_API_ENDPOINT }}
nifi-token: ${{ secrets.NIFI_TOKEN }}
registry-token: ${{ secrets.GH_REGISTRY_TOKEN }}
registry-repo: ${{ github.repository }}
bucket: connectors
flow: postgresql
GitLab CI
For GitLab CI, include the fragments template:
include:
- remote: 'https://raw.githubusercontent.com/Chaffelson/nipyapi-actions/main/templates/fragments.yml'
variables:
NIFI_API_ENDPOINT: "https://nifi.example.com/nifi-api"
NIFI_REGISTRY_REPO: "mygroup/nifi-flows"
NIFI_BUCKET: "connectors"
NIFI_FLOW: "postgresql"
deploy:
image: python:3.11
before_script:
- !reference [.nipyapi, setup]
script:
- !reference [.nipyapi, ensure-registry]
- !reference [.nipyapi, deploy-flow]
- !reference [.nipyapi, start-flow]
Python Usage
CI functions can also be used directly in Python:
import os
import nipyapi
# Configure connection
nipyapi.profiles.switch('production')
# Or use environment variables
os.environ['GH_REGISTRY_TOKEN'] = 'ghp_xxxx'
# Ensure registry client
result = nipyapi.ci.ensure_registry(
repo='myorg/nifi-flows',
client_name='MyRegistryClient'
)
registry_id = result['registry_client_id']
# Deploy flow
result = nipyapi.ci.deploy_flow(
registry_client_id=registry_id,
bucket='connectors',
flow='postgresql'
)
pg_id = result['process_group_id']
# Start flow
nipyapi.ci.start_flow(process_group_id=pg_id)
# Get status
status = nipyapi.ci.get_status(process_group_id=pg_id)
print(f"State: {status['state']}, Running: {status['running_processors']}")
Cross-References
For CLI usage: See Command-Line Interface
For profile configuration: See Environment Profiles with NiPyAPI
For version control details: See Versioning
For GitHub Actions: See nipyapi-actions