Python SDK Examples
Production-ready code examples for common use cases with the ASCEND Python SDK.
Table of Contents
- Basic Action Evaluation
- Customer Service Agent
- Data Pipeline Agent
- Approval Workflow Handler
- Webhook Server
- Async Operations
- Error Handling Patterns
- Testing with ASCEND
Basic Action Evaluation
Simple pattern for evaluating and executing governed actions.
"""
Basic action evaluation pattern
"""
from ascend import AscendClient, Decision
import time
client = AscendClient(
api_key="owkai_your_api_key",
agent_id="basic-agent",
agent_name="Basic Agent"
)
def governed_action(action_type: str, resource: str, params: dict) -> dict:
"""Execute an action with ASCEND governance."""
# Request authorization
decision = client.evaluate_action(
action_type=action_type,
resource=resource,
parameters=params
)
if decision.decision == Decision.ALLOWED:
start = time.time()
try:
# Execute the action
result = execute_action(action_type, resource, params)
# Log completion
client.log_action_completed(
action_id=decision.action_id,
result=result,
duration_ms=int((time.time() - start) * 1000)
)
return {"status": "success", "result": result}
except Exception as e:
client.log_action_failed(
action_id=decision.action_id,
error={"message": str(e)}
)
return {"status": "error", "error": str(e)}
elif decision.decision == Decision.PENDING:
return {
"status": "pending_approval",
"approval_id": decision.approval_request_id,
"approvers": decision.required_approvers
}
else:
return {
"status": "denied",
"reason": decision.reason,
"risk_score": decision.risk_score
}
def execute_action(action_type, resource, params):
"""Placeholder for actual action execution."""
return {"executed": True}
# Usage
result = governed_action(
action_type="database.query",
resource="production_db",
params={"query": "SELECT * FROM users LIMIT 10"}
)
print(result)
Customer Service Agent
A complete customer service agent with refund processing.
"""
Customer Service Agent with ASCEND Governance
Features:
- Automatic risk-based approvals
- Refund amount thresholds
- PII handling compliance
"""
from ascend import AscendClient, FailMode, Decision
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class RefundRequest:
customer_id: str
order_id: str
amount: float
currency: str
reason: str
@dataclass
class RefundResult:
status: str
refund_id: Optional[str] = None
approval_id: Optional[str] = None
message: Optional[str] = None
class CustomerServiceAgent:
"""Customer service agent with ASCEND governance."""
def __init__(self, api_key: str, environment: str = "production"):
self.client = AscendClient(
api_key=api_key,
agent_id="customer-service-agent",
agent_name="Customer Service Bot",
environment=environment,
fail_mode=FailMode.CLOSED,
timeout=5
)
# Register agent
self.client.register(
agent_type="automation",
capabilities=[
"transaction.refund",
"customer.lookup",
"order.status"
],
allowed_resources=[
"stripe_api",
"customer_db",
"order_db"
]
)
def process_refund(self, request: RefundRequest) -> RefundResult:
"""Process a refund with governance."""
# Determine risk indicators based on amount
risk_indicators = {
"financial_data": True,
"amount_threshold": "exceeded" if request.amount > 100 else "normal"
}
# Request authorization
decision = self.client.evaluate_action(
action_type="transaction.refund",
resource="stripe_api",
parameters={
"customer_id": request.customer_id,
"order_id": request.order_id,
"amount": request.amount,
"currency": request.currency,
"reason": request.reason
},
risk_indicators=risk_indicators,
context={
"user_request": f"Refund ${request.amount} for order {request.order_id}"
}
)
if decision.decision == Decision.ALLOWED:
return self._execute_refund(decision.action_id, request)
elif decision.decision == Decision.PENDING:
return RefundResult(
status="pending_approval",
approval_id=decision.approval_request_id,
message=f"Refund requires approval. ID: {decision.approval_request_id}"
)
else:
return RefundResult(
status="denied",
message=f"Refund denied: {decision.reason}"
)
def _execute_refund(self, action_id: str, request: RefundRequest) -> RefundResult:
"""Execute the actual refund."""
start = time.time()
try:
# Simulate refund processing
refund_id = f"ref_{int(time.time())}"
# Log completion
self.client.log_action_completed(
action_id=action_id,
result={
"refund_id": refund_id,
"amount": request.amount
},
duration_ms=int((time.time() - start) * 1000)
)
return RefundResult(
status="completed",
refund_id=refund_id,
message=f"Refund {refund_id} processed successfully"
)
except Exception as e:
self.client.log_action_failed(
action_id=action_id,
error={"code": "REFUND_FAILED", "message": str(e)}
)
return RefundResult(
status="error",
message=f"Refund failed: {str(e)}"
)
def lookup_customer(self, customer_id: str) -> dict:
"""Look up customer with PII handling."""
decision = self.client.evaluate_action(
action_type="customer.lookup",
resource="customer_db",
parameters={"customer_id": customer_id},
risk_indicators={"pii_involved": True}
)
if decision.decision == Decision.ALLOWED:
# Return customer data
return {
"customer_id": customer_id,
"name": "John Doe",
"email": "john@example.com"
}
else:
return {"error": f"Access denied: {decision.reason}"}
# Usage
if __name__ == "__main__":
agent = CustomerServiceAgent(api_key="owkai_prod_xxx")
result = agent.process_refund(RefundRequest(
customer_id="cust_123",
order_id="ord_456",
amount=75.00,
currency="USD",
reason="Product defect"
))
print(f"Refund result: {result}")
Data Pipeline Agent
Agent for ETL operations with governance.
"""
Data Pipeline Agent with ASCEND Governance
Features:
- Batch data processing
- Multi-step pipelines
- Audit logging
"""
from ascend import AscendClient, Decision
from typing import List, Dict, Any
import time
class DataPipelineAgent:
"""Data pipeline agent with ASCEND governance."""
def __init__(self, api_key: str):
self.client = AscendClient(
api_key=api_key,
agent_id="data-pipeline-agent",
agent_name="ETL Pipeline Bot"
)
self.client.register(
agent_type="automation",
capabilities=[
"data.extract",
"data.transform",
"data.load",
"data.delete"
],
allowed_resources=[
"source_db",
"warehouse_db",
"data_lake"
]
)
def run_pipeline(
self,
source: str,
destination: str,
query: str,
transformations: List[str]
) -> Dict[str, Any]:
"""Run a complete ETL pipeline."""
pipeline_id = f"pipe_{int(time.time())}"
results = []
# Step 1: Extract
extract_result = self._extract(source, query, pipeline_id)
if extract_result["status"] != "success":
return {"pipeline_id": pipeline_id, "status": "failed", "step": "extract"}
results.append(extract_result)
# Step 2: Transform
transform_result = self._transform(
extract_result["data"],
transformations,
pipeline_id
)
if transform_result["status"] != "success":
return {"pipeline_id": pipeline_id, "status": "failed", "step": "transform"}
results.append(transform_result)
# Step 3: Load
load_result = self._load(
destination,
transform_result["data"],
pipeline_id
)
results.append(load_result)
return {
"pipeline_id": pipeline_id,
"status": "completed" if load_result["status"] == "success" else "failed",
"steps": results
}
def _extract(self, source: str, query: str, pipeline_id: str) -> dict:
"""Extract data from source."""
decision = self.client.evaluate_action(
action_type="data.extract",
resource=source,
parameters={"query": query},
context={"pipeline_id": pipeline_id}
)
if decision.decision == Decision.ALLOWED:
start = time.time()
# Simulate extraction
data = [{"id": 1, "value": "A"}, {"id": 2, "value": "B"}]
self.client.log_action_completed(
action_id=decision.action_id,
result={"rows_extracted": len(data)},
duration_ms=int((time.time() - start) * 1000)
)
return {"status": "success", "data": data, "count": len(data)}
return {
"status": "denied",
"reason": decision.reason
}
def _transform(
self,
data: List[dict],
transformations: List[str],
pipeline_id: str
) -> dict:
"""Transform data."""
decision = self.client.evaluate_action(
action_type="data.transform",
resource="memory",
parameters={
"transformations": transformations,
"row_count": len(data)
},
context={"pipeline_id": pipeline_id}
)
if decision.decision == Decision.ALLOWED:
start = time.time()
# Apply transformations
transformed = [
{**row, "processed": True}
for row in data
]
self.client.log_action_completed(
action_id=decision.action_id,
result={"rows_transformed": len(transformed)},
duration_ms=int((time.time() - start) * 1000)
)
return {"status": "success", "data": transformed}
return {"status": "denied", "reason": decision.reason}
def _load(self, destination: str, data: List[dict], pipeline_id: str) -> dict:
"""Load data to destination."""
decision = self.client.evaluate_action(
action_type="data.load",
resource=destination,
parameters={"row_count": len(data)},
context={"pipeline_id": pipeline_id}
)
if decision.decision == Decision.ALLOWED:
start = time.time()
# Simulate load
rows_loaded = len(data)
self.client.log_action_completed(
action_id=decision.action_id,
result={"rows_loaded": rows_loaded},
duration_ms=int((time.time() - start) * 1000)
)
return {"status": "success", "rows_loaded": rows_loaded}
return {"status": "denied", "reason": decision.reason}
# Usage
if __name__ == "__main__":
pipeline = DataPipelineAgent(api_key="owkai_prod_xxx")
result = pipeline.run_pipeline(
source="source_db",
destination="warehouse_db",
query="SELECT * FROM events WHERE date > '2024-01-01'",
transformations=["normalize", "deduplicate", "enrich"]
)
print(f"Pipeline result: {result}")
Approval Workflow Handler
Handle pending approvals with polling and callbacks.
"""
Approval Workflow Handler
Features:
- Poll for approval status
- Webhook-based notifications
- Timeout handling
"""
from ascend import AscendClient, Decision
import time
import threading
from typing import Callable, Optional
class ApprovalWorkflow:
"""Handle approval workflows for pending actions."""
def __init__(self, client: AscendClient):
self.client = client
self._callbacks: dict = {}
def submit_for_approval(
self,
action_type: str,
resource: str,
parameters: dict,
on_approved: Callable,
on_denied: Callable,
on_timeout: Optional[Callable] = None,
timeout_seconds: int = 300
) -> str:
"""Submit action and handle approval asynchronously."""
decision = self.client.evaluate_action(
action_type=action_type,
resource=resource,
parameters=parameters,
wait_for_decision=False
)
if decision.decision == Decision.ALLOWED:
on_approved(decision)
return decision.action_id
elif decision.decision == Decision.DENIED:
on_denied(decision)
return decision.action_id
elif decision.decision == Decision.PENDING:
# Start background polling
approval_id = decision.approval_request_id
thread = threading.Thread(
target=self._poll_approval,
args=(
approval_id,
decision,
on_approved,
on_denied,
on_timeout,
timeout_seconds
)
)
thread.daemon = True
thread.start()
return decision.action_id
def _poll_approval(
self,
approval_id: str,
decision,
on_approved: Callable,
on_denied: Callable,
on_timeout: Optional[Callable],
timeout_seconds: int
):
"""Poll for approval status in background."""
start = time.time()
poll_interval = 5
while time.time() - start < timeout_seconds:
try:
status = self.client.check_approval(approval_id)
if status["approved"]:
decision.decision = Decision.ALLOWED
on_approved(decision)
return
elif status["denied"]:
decision.decision = Decision.DENIED
decision.reason = status.get("comments", "Denied by reviewer")
on_denied(decision)
return
except Exception as e:
print(f"Error checking approval: {e}")
time.sleep(poll_interval)
# Timeout
if on_timeout:
on_timeout(decision)
# Usage
def handle_approved(decision):
print(f"Action approved: {decision.action_id}")
# Execute the action
result = execute_sensitive_operation()
print(f"Executed: {result}")
def handle_denied(decision):
print(f"Action denied: {decision.reason}")
def handle_timeout(decision):
print(f"Approval timed out for: {decision.action_id}")
def execute_sensitive_operation():
return {"status": "completed"}
if __name__ == "__main__":
client = AscendClient(
api_key="owkai_prod_xxx",
agent_id="approval-agent",
agent_name="Approval Handler"
)
workflow = ApprovalWorkflow(client)
action_id = workflow.submit_for_approval(
action_type="database.delete",
resource="production_db",
parameters={"table": "old_logs", "where": "created_at < '2023-01-01'"},
on_approved=handle_approved,
on_denied=handle_denied,
on_timeout=handle_timeout,
timeout_seconds=300
)
print(f"Submitted action: {action_id}")
print("Waiting for approval...")
# Keep main thread alive
time.sleep(310)
Webhook Server
Receive and process ASCEND webhooks.
"""
ASCEND Webhook Server
Features:
- Signature verification
- Event routing
- Idempotency handling
"""
from flask import Flask, request, jsonify
import hmac
import hashlib
from typing import Dict, Callable
app = Flask(__name__)
WEBHOOK_SECRET = "whsec_your_secret_here"
# Event handlers
event_handlers: Dict[str, Callable] = {}
def verify_signature(payload: bytes, timestamp: str, signature: str) -> bool:
"""Verify webhook signature."""
expected = hmac.new(
WEBHOOK_SECRET.encode(),
f"{timestamp}.{payload.decode()}".encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"v1={expected}", signature)
def on_event(event_type: str):
"""Decorator to register event handlers."""
def decorator(func: Callable):
event_handlers[event_type] = func
return func
return decorator
@on_event("action.approved")
def handle_action_approved(data: dict):
"""Handle approved action event."""
action_id = data.get("action_id")
agent_id = data.get("agent_id")
approver = data.get("approver")
print(f"Action {action_id} approved by {approver}")
# Trigger execution of pending action
execute_pending_action(action_id)
@on_event("action.denied")
def handle_action_denied(data: dict):
"""Handle denied action event."""
action_id = data.get("action_id")
reason = data.get("reason")
print(f"Action {action_id} denied: {reason}")
# Notify requester
notify_user_of_denial(action_id, reason)
@on_event("policy.violation")
def handle_policy_violation(data: dict):
"""Handle policy violation event."""
agent_id = data.get("agent_id")
violations = data.get("violations", [])
print(f"Policy violation by {agent_id}: {violations}")
# Alert security team
alert_security(agent_id, violations)
@on_event("agent.trust_changed")
def handle_trust_changed(data: dict):
"""Handle agent trust level change."""
agent_id = data.get("agent_id")
old_level = data.get("old_trust_level")
new_level = data.get("new_trust_level")
print(f"Agent {agent_id} trust changed: {old_level} -> {new_level}")
@app.route("/webhooks/ascend", methods=["POST"])
def webhook_handler():
"""Main webhook endpoint."""
# Get signature headers
signature = request.headers.get("X-Signature", "")
timestamp = request.headers.get("X-Timestamp", "")
# Verify signature
if not verify_signature(request.data, timestamp, signature):
return jsonify({"error": "Invalid signature"}), 401
# Parse payload
payload = request.json
event_type = payload.get("event")
data = payload.get("data", {})
# Route to handler
handler = event_handlers.get(event_type)
if handler:
try:
handler(data)
return jsonify({"status": "processed"}), 200
except Exception as e:
print(f"Handler error: {e}")
return jsonify({"error": "Processing failed"}), 500
return jsonify({"status": "ignored", "event": event_type}), 200
# Placeholder functions
def execute_pending_action(action_id: str):
pass
def notify_user_of_denial(action_id: str, reason: str):
pass
def alert_security(agent_id: str, violations: list):
pass
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080, ssl_context="adhoc")
Async Operations
Using ASCEND with asyncio.
"""
Async ASCEND Operations
Features:
- Async action evaluation
- Concurrent request handling
- Async context managers
"""
import asyncio
from ascend import AscendClient, Decision
from typing import List, Dict
import time
class AsyncGovernedAgent:
"""Agent with async governance operations."""
def __init__(self, api_key: str):
self.client = AscendClient(
api_key=api_key,
agent_id="async-agent",
agent_name="Async Agent"
)
async def evaluate_action_async(
self,
action_type: str,
resource: str,
parameters: dict
) -> dict:
"""Evaluate action asynchronously."""
# Run synchronous SDK call in executor
loop = asyncio.get_event_loop()
decision = await loop.run_in_executor(
None,
lambda: self.client.evaluate_action(
action_type=action_type,
resource=resource,
parameters=parameters,
wait_for_decision=False
)
)
return {
"decision": decision.decision.value,
"action_id": decision.action_id,
"risk_score": decision.risk_score
}
async def batch_evaluate(
self,
actions: List[Dict]
) -> List[dict]:
"""Evaluate multiple actions concurrently."""
tasks = [
self.evaluate_action_async(
action["action_type"],
action["resource"],
action["parameters"]
)
for action in actions
]
return await asyncio.gather(*tasks)
async def main():
agent = AsyncGovernedAgent(api_key="owkai_prod_xxx")
# Single async evaluation
result = await agent.evaluate_action_async(
action_type="database.query",
resource="production_db",
parameters={"query": "SELECT * FROM users"}
)
print(f"Single result: {result}")
# Batch evaluation
actions = [
{"action_type": "database.query", "resource": "db1", "parameters": {"query": "Q1"}},
{"action_type": "file.read", "resource": "/logs", "parameters": {"path": "/app.log"}},
{"action_type": "api.call", "resource": "api", "parameters": {"endpoint": "/status"}}
]
results = await agent.batch_evaluate(actions)
print(f"Batch results: {results}")
if __name__ == "__main__":
asyncio.run(main())
Error Handling Patterns
Comprehensive error handling for production.
"""
Error Handling Patterns
Features:
- Retry with backoff
- Graceful degradation
- Error reporting
"""
from ascend import (
AscendClient,
Decision,
AuthenticationError,
AuthorizationError,
TimeoutError,
RateLimitError,
ConnectionError,
CircuitBreakerOpen,
ValidationError
)
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ResilientAgent:
"""Agent with resilient error handling."""
def __init__(self, api_key: str, fail_open: bool = False):
self.client = AscendClient(
api_key=api_key,
agent_id="resilient-agent",
agent_name="Resilient Agent",
fail_mode="open" if fail_open else "closed"
)
self.fail_open = fail_open
def governed_action(
self,
action_type: str,
resource: str,
parameters: dict,
max_retries: int = 3
) -> dict:
"""Execute action with comprehensive error handling."""
for attempt in range(max_retries):
try:
return self._execute_with_governance(
action_type, resource, parameters
)
except AuthenticationError as e:
# Don't retry auth errors
logger.error(f"Authentication failed: {e}")
return {
"status": "error",
"error_type": "authentication",
"message": "Invalid API credentials",
"recoverable": False
}
except AuthorizationError as e:
# Policy denial - don't retry
logger.warning(f"Authorization denied: {e}")
return {
"status": "denied",
"error_type": "authorization",
"message": e.message,
"policy_violations": e.policy_violations,
"risk_score": e.risk_score,
"recoverable": False
}
except ValidationError as e:
# Input error - don't retry
logger.error(f"Validation failed: {e}")
return {
"status": "error",
"error_type": "validation",
"message": e.message,
"field_errors": e.field_errors,
"recoverable": False
}
except RateLimitError as e:
# Rate limited - wait and retry
wait_time = min(e.retry_after, 60)
logger.warning(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
except TimeoutError as e:
# Timeout - retry with backoff
logger.warning(f"Timeout on attempt {attempt + 1}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
return self._handle_service_unavailable("timeout")
except ConnectionError as e:
# Connection error - retry with backoff
logger.warning(f"Connection error on attempt {attempt + 1}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
return self._handle_service_unavailable("connection")
except CircuitBreakerOpen as e:
# Circuit open - don't retry immediately
logger.error(f"Circuit breaker open. Recovery in {e.recovery_time}s")
return self._handle_service_unavailable("circuit_open")
except Exception as e:
# Unexpected error
logger.exception(f"Unexpected error: {e}")
return {
"status": "error",
"error_type": "unexpected",
"message": str(e),
"recoverable": False
}
return self._handle_service_unavailable("max_retries")
def _execute_with_governance(
self,
action_type: str,
resource: str,
parameters: dict
) -> dict:
"""Execute single governed action."""
decision = self.client.evaluate_action(
action_type=action_type,
resource=resource,
parameters=parameters
)
if decision.decision == Decision.ALLOWED:
return {
"status": "allowed",
"action_id": decision.action_id,
"risk_score": decision.risk_score
}
elif decision.decision == Decision.PENDING:
return {
"status": "pending_approval",
"action_id": decision.action_id,
"approval_id": decision.approval_request_id
}
else:
return {
"status": "denied",
"reason": decision.reason,
"risk_score": decision.risk_score
}
def _handle_service_unavailable(self, reason: str) -> dict:
"""Handle service unavailability based on fail mode."""
if self.fail_open:
logger.warning(f"ASCEND unavailable ({reason}). Allowing action (fail-open).")
return {
"status": "allowed_failopen",
"warning": "Action allowed due to governance service unavailability",
"reason": reason
}
return {
"status": "blocked",
"error_type": "service_unavailable",
"message": "Governance service unavailable",
"reason": reason,
"recoverable": True
}
# Usage
if __name__ == "__main__":
agent = ResilientAgent(api_key="owkai_prod_xxx", fail_open=False)
result = agent.governed_action(
action_type="database.query",
resource="production_db",
parameters={"query": "SELECT * FROM users"}
)
print(f"Result: {result}")
Testing with ASCEND
Test your agents without hitting production.
"""
Testing Patterns for ASCEND
Features:
- Mock client for unit tests
- Integration test helpers
- Fixture patterns
"""
import pytest
from unittest.mock import Mock, patch
from ascend import AscendClient, Decision
class MockAscendClient:
"""Mock ASCEND client for testing."""
def __init__(self, default_decision: Decision = Decision.ALLOWED):
self.default_decision = default_decision
self.calls = []
def evaluate_action(self, **kwargs):
self.calls.append(kwargs)
# Create mock decision
decision = Mock()
decision.decision = self.default_decision
decision.action_id = f"test_action_{len(self.calls)}"
decision.risk_score = 25
decision.reason = "Test decision"
decision.approval_request_id = "test_approval_123"
return decision
def log_action_completed(self, **kwargs):
self.calls.append({"type": "completed", **kwargs})
def log_action_failed(self, **kwargs):
self.calls.append({"type": "failed", **kwargs})
@pytest.fixture
def mock_ascend():
"""Pytest fixture for mock ASCEND client."""
return MockAscendClient()
@pytest.fixture
def mock_ascend_denied():
"""Fixture that returns denied decisions."""
return MockAscendClient(default_decision=Decision.DENIED)
class TestMyAgent:
"""Example test class."""
def test_allowed_action(self, mock_ascend):
"""Test that allowed actions execute."""
from my_agent import MyAgent
agent = MyAgent(client=mock_ascend)
result = agent.do_something()
assert result["status"] == "success"
assert len(mock_ascend.calls) >= 1
def test_denied_action(self, mock_ascend_denied):
"""Test that denied actions are handled."""
from my_agent import MyAgent
agent = MyAgent(client=mock_ascend_denied)
result = agent.do_something()
assert result["status"] == "denied"
def test_action_parameters(self, mock_ascend):
"""Test that correct parameters are sent."""
from my_agent import MyAgent
agent = MyAgent(client=mock_ascend)
agent.query_database("SELECT * FROM users")
# Verify call parameters
call = mock_ascend.calls[0]
assert call["action_type"] == "database.query"
assert "SELECT" in call["parameters"]["query"]
# Integration test with real ASCEND (staging)
@pytest.mark.integration
class TestAscendIntegration:
"""Integration tests against staging ASCEND."""
@pytest.fixture
def staging_client(self):
return AscendClient(
api_key="owkai_staging_xxx",
agent_id="test-agent",
agent_name="Integration Test Agent",
api_url="https://staging.owkai.app"
)
def test_connection(self, staging_client):
"""Test connection to staging."""
status = staging_client.test_connection()
assert status["status"] == "connected"
def test_low_risk_action(self, staging_client):
"""Test low-risk action is approved."""
decision = staging_client.evaluate_action(
action_type="database.query",
resource="test_db",
parameters={"query": "SELECT 1"}
)
assert decision.decision == Decision.ALLOWED