Skip to main content

Python SDK Examples

Production-ready code examples for common use cases with the ASCEND Python SDK.

Table of Contents


Basic Action Evaluation

Simple pattern for evaluating and executing governed actions.

"""
Basic action evaluation pattern
"""
from ascend import AscendClient, Decision
import time

client = AscendClient(
api_key="owkai_your_api_key",
agent_id="basic-agent",
agent_name="Basic Agent"
)


def governed_action(action_type: str, resource: str, params: dict) -> dict:
"""Execute an action with ASCEND governance."""

# Request authorization
decision = client.evaluate_action(
action_type=action_type,
resource=resource,
parameters=params
)

if decision.decision == Decision.ALLOWED:
start = time.time()
try:
# Execute the action
result = execute_action(action_type, resource, params)

# Log completion
client.log_action_completed(
action_id=decision.action_id,
result=result,
duration_ms=int((time.time() - start) * 1000)
)
return {"status": "success", "result": result}

except Exception as e:
client.log_action_failed(
action_id=decision.action_id,
error={"message": str(e)}
)
return {"status": "error", "error": str(e)}

elif decision.decision == Decision.PENDING:
return {
"status": "pending_approval",
"approval_id": decision.approval_request_id,
"approvers": decision.required_approvers
}

else:
return {
"status": "denied",
"reason": decision.reason,
"risk_score": decision.risk_score
}


def execute_action(action_type, resource, params):
"""Placeholder for actual action execution."""
return {"executed": True}


# Usage
result = governed_action(
action_type="database.query",
resource="production_db",
params={"query": "SELECT * FROM users LIMIT 10"}
)
print(result)

Customer Service Agent

A complete customer service agent with refund processing.

"""
Customer Service Agent with ASCEND Governance

Features:
- Automatic risk-based approvals
- Refund amount thresholds
- PII handling compliance
"""
from ascend import AscendClient, FailMode, Decision
from dataclasses import dataclass
from typing import Optional
import time


@dataclass
class RefundRequest:
customer_id: str
order_id: str
amount: float
currency: str
reason: str


@dataclass
class RefundResult:
status: str
refund_id: Optional[str] = None
approval_id: Optional[str] = None
message: Optional[str] = None


class CustomerServiceAgent:
"""Customer service agent with ASCEND governance."""

def __init__(self, api_key: str, environment: str = "production"):
self.client = AscendClient(
api_key=api_key,
agent_id="customer-service-agent",
agent_name="Customer Service Bot",
environment=environment,
fail_mode=FailMode.CLOSED,
timeout=5
)

# Register agent
self.client.register(
agent_type="automation",
capabilities=[
"transaction.refund",
"customer.lookup",
"order.status"
],
allowed_resources=[
"stripe_api",
"customer_db",
"order_db"
]
)

def process_refund(self, request: RefundRequest) -> RefundResult:
"""Process a refund with governance."""

# Determine risk indicators based on amount
risk_indicators = {
"financial_data": True,
"amount_threshold": "exceeded" if request.amount > 100 else "normal"
}

# Request authorization
decision = self.client.evaluate_action(
action_type="transaction.refund",
resource="stripe_api",
parameters={
"customer_id": request.customer_id,
"order_id": request.order_id,
"amount": request.amount,
"currency": request.currency,
"reason": request.reason
},
risk_indicators=risk_indicators,
context={
"user_request": f"Refund ${request.amount} for order {request.order_id}"
}
)

if decision.decision == Decision.ALLOWED:
return self._execute_refund(decision.action_id, request)

elif decision.decision == Decision.PENDING:
return RefundResult(
status="pending_approval",
approval_id=decision.approval_request_id,
message=f"Refund requires approval. ID: {decision.approval_request_id}"
)

else:
return RefundResult(
status="denied",
message=f"Refund denied: {decision.reason}"
)

def _execute_refund(self, action_id: str, request: RefundRequest) -> RefundResult:
"""Execute the actual refund."""
start = time.time()

try:
# Simulate refund processing
refund_id = f"ref_{int(time.time())}"

# Log completion
self.client.log_action_completed(
action_id=action_id,
result={
"refund_id": refund_id,
"amount": request.amount
},
duration_ms=int((time.time() - start) * 1000)
)

return RefundResult(
status="completed",
refund_id=refund_id,
message=f"Refund {refund_id} processed successfully"
)

except Exception as e:
self.client.log_action_failed(
action_id=action_id,
error={"code": "REFUND_FAILED", "message": str(e)}
)
return RefundResult(
status="error",
message=f"Refund failed: {str(e)}"
)

def lookup_customer(self, customer_id: str) -> dict:
"""Look up customer with PII handling."""

decision = self.client.evaluate_action(
action_type="customer.lookup",
resource="customer_db",
parameters={"customer_id": customer_id},
risk_indicators={"pii_involved": True}
)

if decision.decision == Decision.ALLOWED:
# Return customer data
return {
"customer_id": customer_id,
"name": "John Doe",
"email": "john@example.com"
}
else:
return {"error": f"Access denied: {decision.reason}"}


# Usage
if __name__ == "__main__":
agent = CustomerServiceAgent(api_key="owkai_prod_xxx")

result = agent.process_refund(RefundRequest(
customer_id="cust_123",
order_id="ord_456",
amount=75.00,
currency="USD",
reason="Product defect"
))

print(f"Refund result: {result}")

Data Pipeline Agent

Agent for ETL operations with governance.

"""
Data Pipeline Agent with ASCEND Governance

Features:
- Batch data processing
- Multi-step pipelines
- Audit logging
"""
from ascend import AscendClient, Decision
from typing import List, Dict, Any
import time


class DataPipelineAgent:
"""Data pipeline agent with ASCEND governance."""

def __init__(self, api_key: str):
self.client = AscendClient(
api_key=api_key,
agent_id="data-pipeline-agent",
agent_name="ETL Pipeline Bot"
)

self.client.register(
agent_type="automation",
capabilities=[
"data.extract",
"data.transform",
"data.load",
"data.delete"
],
allowed_resources=[
"source_db",
"warehouse_db",
"data_lake"
]
)

def run_pipeline(
self,
source: str,
destination: str,
query: str,
transformations: List[str]
) -> Dict[str, Any]:
"""Run a complete ETL pipeline."""

pipeline_id = f"pipe_{int(time.time())}"
results = []

# Step 1: Extract
extract_result = self._extract(source, query, pipeline_id)
if extract_result["status"] != "success":
return {"pipeline_id": pipeline_id, "status": "failed", "step": "extract"}
results.append(extract_result)

# Step 2: Transform
transform_result = self._transform(
extract_result["data"],
transformations,
pipeline_id
)
if transform_result["status"] != "success":
return {"pipeline_id": pipeline_id, "status": "failed", "step": "transform"}
results.append(transform_result)

# Step 3: Load
load_result = self._load(
destination,
transform_result["data"],
pipeline_id
)
results.append(load_result)

return {
"pipeline_id": pipeline_id,
"status": "completed" if load_result["status"] == "success" else "failed",
"steps": results
}

def _extract(self, source: str, query: str, pipeline_id: str) -> dict:
"""Extract data from source."""

decision = self.client.evaluate_action(
action_type="data.extract",
resource=source,
parameters={"query": query},
context={"pipeline_id": pipeline_id}
)

if decision.decision == Decision.ALLOWED:
start = time.time()

# Simulate extraction
data = [{"id": 1, "value": "A"}, {"id": 2, "value": "B"}]

self.client.log_action_completed(
action_id=decision.action_id,
result={"rows_extracted": len(data)},
duration_ms=int((time.time() - start) * 1000)
)

return {"status": "success", "data": data, "count": len(data)}

return {
"status": "denied",
"reason": decision.reason
}

def _transform(
self,
data: List[dict],
transformations: List[str],
pipeline_id: str
) -> dict:
"""Transform data."""

decision = self.client.evaluate_action(
action_type="data.transform",
resource="memory",
parameters={
"transformations": transformations,
"row_count": len(data)
},
context={"pipeline_id": pipeline_id}
)

if decision.decision == Decision.ALLOWED:
start = time.time()

# Apply transformations
transformed = [
{**row, "processed": True}
for row in data
]

self.client.log_action_completed(
action_id=decision.action_id,
result={"rows_transformed": len(transformed)},
duration_ms=int((time.time() - start) * 1000)
)

return {"status": "success", "data": transformed}

return {"status": "denied", "reason": decision.reason}

def _load(self, destination: str, data: List[dict], pipeline_id: str) -> dict:
"""Load data to destination."""

decision = self.client.evaluate_action(
action_type="data.load",
resource=destination,
parameters={"row_count": len(data)},
context={"pipeline_id": pipeline_id}
)

if decision.decision == Decision.ALLOWED:
start = time.time()

# Simulate load
rows_loaded = len(data)

self.client.log_action_completed(
action_id=decision.action_id,
result={"rows_loaded": rows_loaded},
duration_ms=int((time.time() - start) * 1000)
)

return {"status": "success", "rows_loaded": rows_loaded}

return {"status": "denied", "reason": decision.reason}


# Usage
if __name__ == "__main__":
pipeline = DataPipelineAgent(api_key="owkai_prod_xxx")

result = pipeline.run_pipeline(
source="source_db",
destination="warehouse_db",
query="SELECT * FROM events WHERE date > '2024-01-01'",
transformations=["normalize", "deduplicate", "enrich"]
)

print(f"Pipeline result: {result}")

Approval Workflow Handler

Handle pending approvals with polling and callbacks.

"""
Approval Workflow Handler

Features:
- Poll for approval status
- Webhook-based notifications
- Timeout handling
"""
from ascend import AscendClient, Decision
import time
import threading
from typing import Callable, Optional


class ApprovalWorkflow:
"""Handle approval workflows for pending actions."""

def __init__(self, client: AscendClient):
self.client = client
self._callbacks: dict = {}

def submit_for_approval(
self,
action_type: str,
resource: str,
parameters: dict,
on_approved: Callable,
on_denied: Callable,
on_timeout: Optional[Callable] = None,
timeout_seconds: int = 300
) -> str:
"""Submit action and handle approval asynchronously."""

decision = self.client.evaluate_action(
action_type=action_type,
resource=resource,
parameters=parameters,
wait_for_decision=False
)

if decision.decision == Decision.ALLOWED:
on_approved(decision)
return decision.action_id

elif decision.decision == Decision.DENIED:
on_denied(decision)
return decision.action_id

elif decision.decision == Decision.PENDING:
# Start background polling
approval_id = decision.approval_request_id

thread = threading.Thread(
target=self._poll_approval,
args=(
approval_id,
decision,
on_approved,
on_denied,
on_timeout,
timeout_seconds
)
)
thread.daemon = True
thread.start()

return decision.action_id

def _poll_approval(
self,
approval_id: str,
decision,
on_approved: Callable,
on_denied: Callable,
on_timeout: Optional[Callable],
timeout_seconds: int
):
"""Poll for approval status in background."""

start = time.time()
poll_interval = 5

while time.time() - start < timeout_seconds:
try:
status = self.client.check_approval(approval_id)

if status["approved"]:
decision.decision = Decision.ALLOWED
on_approved(decision)
return

elif status["denied"]:
decision.decision = Decision.DENIED
decision.reason = status.get("comments", "Denied by reviewer")
on_denied(decision)
return

except Exception as e:
print(f"Error checking approval: {e}")

time.sleep(poll_interval)

# Timeout
if on_timeout:
on_timeout(decision)


# Usage
def handle_approved(decision):
print(f"Action approved: {decision.action_id}")
# Execute the action
result = execute_sensitive_operation()
print(f"Executed: {result}")


def handle_denied(decision):
print(f"Action denied: {decision.reason}")


def handle_timeout(decision):
print(f"Approval timed out for: {decision.action_id}")


def execute_sensitive_operation():
return {"status": "completed"}


if __name__ == "__main__":
client = AscendClient(
api_key="owkai_prod_xxx",
agent_id="approval-agent",
agent_name="Approval Handler"
)

workflow = ApprovalWorkflow(client)

action_id = workflow.submit_for_approval(
action_type="database.delete",
resource="production_db",
parameters={"table": "old_logs", "where": "created_at < '2023-01-01'"},
on_approved=handle_approved,
on_denied=handle_denied,
on_timeout=handle_timeout,
timeout_seconds=300
)

print(f"Submitted action: {action_id}")
print("Waiting for approval...")

# Keep main thread alive
time.sleep(310)

Webhook Server

Receive and process ASCEND webhooks.

"""
ASCEND Webhook Server

Features:
- Signature verification
- Event routing
- Idempotency handling
"""
from flask import Flask, request, jsonify
import hmac
import hashlib
from typing import Dict, Callable

app = Flask(__name__)

WEBHOOK_SECRET = "whsec_your_secret_here"

# Event handlers
event_handlers: Dict[str, Callable] = {}


def verify_signature(payload: bytes, timestamp: str, signature: str) -> bool:
"""Verify webhook signature."""
expected = hmac.new(
WEBHOOK_SECRET.encode(),
f"{timestamp}.{payload.decode()}".encode(),
hashlib.sha256
).hexdigest()

return hmac.compare_digest(f"v1={expected}", signature)


def on_event(event_type: str):
"""Decorator to register event handlers."""
def decorator(func: Callable):
event_handlers[event_type] = func
return func
return decorator


@on_event("action.approved")
def handle_action_approved(data: dict):
"""Handle approved action event."""
action_id = data.get("action_id")
agent_id = data.get("agent_id")
approver = data.get("approver")

print(f"Action {action_id} approved by {approver}")

# Trigger execution of pending action
execute_pending_action(action_id)


@on_event("action.denied")
def handle_action_denied(data: dict):
"""Handle denied action event."""
action_id = data.get("action_id")
reason = data.get("reason")

print(f"Action {action_id} denied: {reason}")

# Notify requester
notify_user_of_denial(action_id, reason)


@on_event("policy.violation")
def handle_policy_violation(data: dict):
"""Handle policy violation event."""
agent_id = data.get("agent_id")
violations = data.get("violations", [])

print(f"Policy violation by {agent_id}: {violations}")

# Alert security team
alert_security(agent_id, violations)


@on_event("agent.trust_changed")
def handle_trust_changed(data: dict):
"""Handle agent trust level change."""
agent_id = data.get("agent_id")
old_level = data.get("old_trust_level")
new_level = data.get("new_trust_level")

print(f"Agent {agent_id} trust changed: {old_level} -> {new_level}")


@app.route("/webhooks/ascend", methods=["POST"])
def webhook_handler():
"""Main webhook endpoint."""

# Get signature headers
signature = request.headers.get("X-Signature", "")
timestamp = request.headers.get("X-Timestamp", "")

# Verify signature
if not verify_signature(request.data, timestamp, signature):
return jsonify({"error": "Invalid signature"}), 401

# Parse payload
payload = request.json
event_type = payload.get("event")
data = payload.get("data", {})

# Route to handler
handler = event_handlers.get(event_type)
if handler:
try:
handler(data)
return jsonify({"status": "processed"}), 200
except Exception as e:
print(f"Handler error: {e}")
return jsonify({"error": "Processing failed"}), 500

return jsonify({"status": "ignored", "event": event_type}), 200


# Placeholder functions
def execute_pending_action(action_id: str):
pass

def notify_user_of_denial(action_id: str, reason: str):
pass

def alert_security(agent_id: str, violations: list):
pass


if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080, ssl_context="adhoc")

Async Operations

Using ASCEND with asyncio.

"""
Async ASCEND Operations

Features:
- Async action evaluation
- Concurrent request handling
- Async context managers
"""
import asyncio
from ascend import AscendClient, Decision
from typing import List, Dict
import time


class AsyncGovernedAgent:
"""Agent with async governance operations."""

def __init__(self, api_key: str):
self.client = AscendClient(
api_key=api_key,
agent_id="async-agent",
agent_name="Async Agent"
)

async def evaluate_action_async(
self,
action_type: str,
resource: str,
parameters: dict
) -> dict:
"""Evaluate action asynchronously."""

# Run synchronous SDK call in executor
loop = asyncio.get_event_loop()
decision = await loop.run_in_executor(
None,
lambda: self.client.evaluate_action(
action_type=action_type,
resource=resource,
parameters=parameters,
wait_for_decision=False
)
)

return {
"decision": decision.decision.value,
"action_id": decision.action_id,
"risk_score": decision.risk_score
}

async def batch_evaluate(
self,
actions: List[Dict]
) -> List[dict]:
"""Evaluate multiple actions concurrently."""

tasks = [
self.evaluate_action_async(
action["action_type"],
action["resource"],
action["parameters"]
)
for action in actions
]

return await asyncio.gather(*tasks)


async def main():
agent = AsyncGovernedAgent(api_key="owkai_prod_xxx")

# Single async evaluation
result = await agent.evaluate_action_async(
action_type="database.query",
resource="production_db",
parameters={"query": "SELECT * FROM users"}
)
print(f"Single result: {result}")

# Batch evaluation
actions = [
{"action_type": "database.query", "resource": "db1", "parameters": {"query": "Q1"}},
{"action_type": "file.read", "resource": "/logs", "parameters": {"path": "/app.log"}},
{"action_type": "api.call", "resource": "api", "parameters": {"endpoint": "/status"}}
]

results = await agent.batch_evaluate(actions)
print(f"Batch results: {results}")


if __name__ == "__main__":
asyncio.run(main())

Error Handling Patterns

Comprehensive error handling for production.

"""
Error Handling Patterns

Features:
- Retry with backoff
- Graceful degradation
- Error reporting
"""
from ascend import (
AscendClient,
Decision,
AuthenticationError,
AuthorizationError,
TimeoutError,
RateLimitError,
ConnectionError,
CircuitBreakerOpen,
ValidationError
)
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ResilientAgent:
"""Agent with resilient error handling."""

def __init__(self, api_key: str, fail_open: bool = False):
self.client = AscendClient(
api_key=api_key,
agent_id="resilient-agent",
agent_name="Resilient Agent",
fail_mode="open" if fail_open else "closed"
)
self.fail_open = fail_open

def governed_action(
self,
action_type: str,
resource: str,
parameters: dict,
max_retries: int = 3
) -> dict:
"""Execute action with comprehensive error handling."""

for attempt in range(max_retries):
try:
return self._execute_with_governance(
action_type, resource, parameters
)

except AuthenticationError as e:
# Don't retry auth errors
logger.error(f"Authentication failed: {e}")
return {
"status": "error",
"error_type": "authentication",
"message": "Invalid API credentials",
"recoverable": False
}

except AuthorizationError as e:
# Policy denial - don't retry
logger.warning(f"Authorization denied: {e}")
return {
"status": "denied",
"error_type": "authorization",
"message": e.message,
"policy_violations": e.policy_violations,
"risk_score": e.risk_score,
"recoverable": False
}

except ValidationError as e:
# Input error - don't retry
logger.error(f"Validation failed: {e}")
return {
"status": "error",
"error_type": "validation",
"message": e.message,
"field_errors": e.field_errors,
"recoverable": False
}

except RateLimitError as e:
# Rate limited - wait and retry
wait_time = min(e.retry_after, 60)
logger.warning(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
continue

except TimeoutError as e:
# Timeout - retry with backoff
logger.warning(f"Timeout on attempt {attempt + 1}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
return self._handle_service_unavailable("timeout")

except ConnectionError as e:
# Connection error - retry with backoff
logger.warning(f"Connection error on attempt {attempt + 1}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
return self._handle_service_unavailable("connection")

except CircuitBreakerOpen as e:
# Circuit open - don't retry immediately
logger.error(f"Circuit breaker open. Recovery in {e.recovery_time}s")
return self._handle_service_unavailable("circuit_open")

except Exception as e:
# Unexpected error
logger.exception(f"Unexpected error: {e}")
return {
"status": "error",
"error_type": "unexpected",
"message": str(e),
"recoverable": False
}

return self._handle_service_unavailable("max_retries")

def _execute_with_governance(
self,
action_type: str,
resource: str,
parameters: dict
) -> dict:
"""Execute single governed action."""

decision = self.client.evaluate_action(
action_type=action_type,
resource=resource,
parameters=parameters
)

if decision.decision == Decision.ALLOWED:
return {
"status": "allowed",
"action_id": decision.action_id,
"risk_score": decision.risk_score
}

elif decision.decision == Decision.PENDING:
return {
"status": "pending_approval",
"action_id": decision.action_id,
"approval_id": decision.approval_request_id
}

else:
return {
"status": "denied",
"reason": decision.reason,
"risk_score": decision.risk_score
}

def _handle_service_unavailable(self, reason: str) -> dict:
"""Handle service unavailability based on fail mode."""

if self.fail_open:
logger.warning(f"ASCEND unavailable ({reason}). Allowing action (fail-open).")
return {
"status": "allowed_failopen",
"warning": "Action allowed due to governance service unavailability",
"reason": reason
}

return {
"status": "blocked",
"error_type": "service_unavailable",
"message": "Governance service unavailable",
"reason": reason,
"recoverable": True
}


# Usage
if __name__ == "__main__":
agent = ResilientAgent(api_key="owkai_prod_xxx", fail_open=False)

result = agent.governed_action(
action_type="database.query",
resource="production_db",
parameters={"query": "SELECT * FROM users"}
)

print(f"Result: {result}")

Testing with ASCEND

Test your agents without hitting production.

"""
Testing Patterns for ASCEND

Features:
- Mock client for unit tests
- Integration test helpers
- Fixture patterns
"""
import pytest
from unittest.mock import Mock, patch
from ascend import AscendClient, Decision


class MockAscendClient:
"""Mock ASCEND client for testing."""

def __init__(self, default_decision: Decision = Decision.ALLOWED):
self.default_decision = default_decision
self.calls = []

def evaluate_action(self, **kwargs):
self.calls.append(kwargs)

# Create mock decision
decision = Mock()
decision.decision = self.default_decision
decision.action_id = f"test_action_{len(self.calls)}"
decision.risk_score = 25
decision.reason = "Test decision"
decision.approval_request_id = "test_approval_123"

return decision

def log_action_completed(self, **kwargs):
self.calls.append({"type": "completed", **kwargs})

def log_action_failed(self, **kwargs):
self.calls.append({"type": "failed", **kwargs})


@pytest.fixture
def mock_ascend():
"""Pytest fixture for mock ASCEND client."""
return MockAscendClient()


@pytest.fixture
def mock_ascend_denied():
"""Fixture that returns denied decisions."""
return MockAscendClient(default_decision=Decision.DENIED)


class TestMyAgent:
"""Example test class."""

def test_allowed_action(self, mock_ascend):
"""Test that allowed actions execute."""
from my_agent import MyAgent

agent = MyAgent(client=mock_ascend)
result = agent.do_something()

assert result["status"] == "success"
assert len(mock_ascend.calls) >= 1

def test_denied_action(self, mock_ascend_denied):
"""Test that denied actions are handled."""
from my_agent import MyAgent

agent = MyAgent(client=mock_ascend_denied)
result = agent.do_something()

assert result["status"] == "denied"

def test_action_parameters(self, mock_ascend):
"""Test that correct parameters are sent."""
from my_agent import MyAgent

agent = MyAgent(client=mock_ascend)
agent.query_database("SELECT * FROM users")

# Verify call parameters
call = mock_ascend.calls[0]
assert call["action_type"] == "database.query"
assert "SELECT" in call["parameters"]["query"]


# Integration test with real ASCEND (staging)
@pytest.mark.integration
class TestAscendIntegration:
"""Integration tests against staging ASCEND."""

@pytest.fixture
def staging_client(self):
return AscendClient(
api_key="owkai_staging_xxx",
agent_id="test-agent",
agent_name="Integration Test Agent",
api_url="https://staging.owkai.app"
)

def test_connection(self, staging_client):
"""Test connection to staging."""
status = staging_client.test_connection()
assert status["status"] == "connected"

def test_low_risk_action(self, staging_client):
"""Test low-risk action is approved."""
decision = staging_client.evaluate_action(
action_type="database.query",
resource="test_db",
parameters={"query": "SELECT 1"}
)
assert decision.decision == Decision.ALLOWED

See Also