AWS Cognito Integration
Overview
ASCEND uses AWS Cognito as the primary identity provider for interactive users. The integration implements enterprise-grade security with RS256 JWT validation, automatic JWKS key rotation, multi-pool support for multi-tenant deployments, and complete audit logging.
Why It Matters
AWS Cognito provides:
- Managed Identity Service: Reduces operational burden and security risk of self-hosted identity
- Enterprise Features: MFA, SAML/OIDC federation, advanced security features
- Compliance Ready: SOC 2, HIPAA, PCI-DSS certifications from AWS
- Scalability: Supports millions of users without infrastructure changes
- Security: RSA key management, automatic rotation, DDoS protection
Architecture
Authentication Flow
+---------------+ +----------------+
| Web Client | | AWS Cognito |
| (Browser) | | User Pool |
+-------+-------+ +-------+--------+
| |
| 1. Login Request |
+----------------------------------->|
| |
| 2. Challenge (Password/MFA) |
|<-----------------------------------+
| |
| 3. Challenge Response |
+----------------------------------->|
| |
| 4. JWT Tokens (ID, Access, Refresh)|
|<-----------------------------------+
| |
+-------+-------+ +-------+--------+
| Web Client | | ASCEND API |
+-------+-------+ +-------+--------+
| |
| 5. API Request + JWT |
+----------------------------------->|
| |
| +-------------------------+
| | 6. Fetch JWKS |
| | (Cached) |
| +-------+--------+--------+
| | |
| v |
| +-------+--------+
| | 7. Validate |
| | Signature |
| +-------+--------+
| |
| +-------+--------+
| | 8. Validate |
| | Claims |
| +-------+--------+
| |
| +-------+--------+
| | 9. Check |
| | Revocation |
| +-------+--------+
| |
| +-------+--------+
| | 10. Set RLS |
| | Context |
| +----------------+
| |
| 11. API Response |
|<-----------------------------------+
Multi-Pool Support (RBAC-001)
ASCEND supports two-pool JWT validation for enterprise multi-tenant deployments:
| Pool Type | Purpose | Token Claims | Use Case |
|---|---|---|---|
| Platform Pool | ASCEND internal staff | scope='platform', org_id=null | Platform operations |
| Per-Org Pools | Tenant users | scope='org', org_id=[tenant_id] | Tenant access |
# Enable two-pool authentication
ENABLE_TWO_POOL_AUTH=true
# Platform pool (managed by ASCEND)
COGNITO_REGION=us-east-2
COGNITO_USER_POOL_ID=us-east-2_PlatformPool
COGNITO_APP_CLIENT_ID=platform_client_id
# Per-org pools stored in organizations table
# organizations.cognito_user_pool_id
# organizations.cognito_app_client_id
Configuration
Environment Variables
# Required - AWS Cognito Configuration
COGNITO_REGION=us-east-2
COGNITO_USER_POOL_ID=us-east-2_xxxxxxxxx
COGNITO_APP_CLIENT_ID=xxxxxxxxxxxxxxxxxxxxxxxxxx
# Optional - Multi-Pool Support
ENABLE_TWO_POOL_AUTH=false # Set true for multi-tenant deployment
# Optional - Security Settings
MAX_LOGIN_ATTEMPTS_PER_IP=5
MAX_LOGIN_ATTEMPTS_PER_EMAIL=10
LOGIN_ATTEMPT_WINDOW_MINUTES=15
Cognito User Pool Configuration
Required Settings in AWS Console:
{
"UserPoolName": "ascend-production",
"Policies": {
"PasswordPolicy": {
"MinimumLength": 12,
"RequireUppercase": true,
"RequireLowercase": true,
"RequireNumbers": true,
"RequireSymbols": true,
"TemporaryPasswordValidityDays": 1
}
},
"MfaConfiguration": "OPTIONAL", // or "ON" for required MFA
"UserPoolTags": {
"Environment": "production",
"Compliance": "soc2-hipaa-pci"
}
}
Required Custom Attributes:
| Attribute | Type | Required | Purpose |
|---|---|---|---|
custom:organization_id | Number | Yes | Tenant isolation |
custom:organization_slug | String | Yes | URL-safe org identifier |
custom:role | String | Yes | RBAC role |
custom:is_org_admin | String | No | Organization admin flag |
App Client Configuration
{
"ClientName": "ascend-web-client",
"GenerateSecret": false,
"RefreshTokenValidity": 30,
"AccessTokenValidity": 1,
"IdTokenValidity": 1,
"TokenValidityUnits": {
"AccessToken": "hours",
"IdToken": "hours",
"RefreshToken": "days"
},
"ExplicitAuthFlows": [
"ALLOW_USER_PASSWORD_AUTH",
"ALLOW_REFRESH_TOKEN_AUTH",
"ALLOW_USER_SRP_AUTH"
],
"PreventUserExistenceErrors": "ENABLED"
}
JWT Token Structure
ID Token Claims
{
"sub": "12345678-1234-1234-1234-123456789012",
"aud": "app_client_id",
"email_verified": true,
"token_use": "id",
"auth_time": 1705766400,
"iss": "https://cognito-idp.us-east-2.amazonaws.com/us-east-2_xxxxxxxxx",
"cognito:username": "user@example.com",
"exp": 1705770000,
"iat": 1705766400,
"jti": "unique-token-id",
"email": "user@example.com",
"custom:organization_id": "123",
"custom:organization_slug": "acme-corp",
"custom:role": "admin",
"custom:is_org_admin": "true"
}
Validation Checks
| Check | Value | Failure Response |
|---|---|---|
| Algorithm | RS256 | 401 - Invalid token algorithm |
| Signature | JWKS public key | 401 - Invalid signature |
| Issuer (iss) | Cognito pool URL | 401 - Invalid issuer |
| Audience (aud) | App client ID | 401 - Invalid audience |
| Expiration (exp) | Current timestamp | 401 - Token expired |
| Token Use | "id" | 401 - Invalid token use |
| Organization ID | Present and valid | 403 - Invalid organization |
JWKS Key Management
Automatic Key Rotation
@lru_cache(maxsize=1)
def get_cognito_public_keys() -> Dict[str, RSAKey]:
"""
Fetch and cache AWS Cognito public keys for JWT signature validation.
Keys are cached to reduce latency. Cache is automatically refreshed
when a signature validation fails with an unknown key ID.
"""
response = requests.get(COGNITO_JWKS_URL, timeout=10)
keys = response.json()["keys"]
public_keys = {}
for key_data in keys:
kid = key_data["kid"]
public_keys[kid] = jwk.construct(key_data)
return public_keys
Key Refresh Strategy
| Scenario | Action |
|---|---|
| Normal operation | Use cached keys |
| Unknown key ID | Refresh cache, retry validation |
| Refresh fails | Return 503 (fail-secure) |
| Key rotation by Cognito | Automatic during next refresh |
Brute Force Protection
Protection Layers
# IP-based protection
MAX_LOGIN_ATTEMPTS_PER_IP = 5 # Block after 5 failed attempts from same IP
LOGIN_ATTEMPT_WINDOW_MINUTES = 15
# Email-based protection
MAX_LOGIN_ATTEMPTS_PER_EMAIL = 10 # Block after 10 failed attempts for same email
Implementation
async def check_brute_force(email: str, ip_address: str, db: Session) -> bool:
"""
Check if login should be blocked due to brute force attempts.
Returns True if should block, False if allowed.
"""
cutoff_time = datetime.now() - timedelta(minutes=LOGIN_ATTEMPT_WINDOW_MINUTES)
# Check IP-based attempts
ip_attempts = db.query(LoginAttempt).filter(
LoginAttempt.ip_address == ip_address,
LoginAttempt.success == False,
LoginAttempt.attempted_at >= cutoff_time
).count()
if ip_attempts >= MAX_LOGIN_ATTEMPTS_PER_IP:
return True # Block
# Check email-based attempts
email_attempts = db.query(LoginAttempt).filter(
LoginAttempt.email == email.lower(),
LoginAttempt.success == False,
LoginAttempt.attempted_at >= cutoff_time
).count()
if email_attempts >= MAX_LOGIN_ATTEMPTS_PER_EMAIL:
return True # Block
return False # Allow
Token Revocation
Tracking Tokens
async def track_token(
token_jti: str,
user_id: int,
cognito_user_id: str,
organization_id: int,
token_type: str,
expires_at: datetime,
db: Session
):
"""Track issued token for revocation support."""
token_record = CognitoToken(
user_id=user_id,
cognito_user_id=cognito_user_id,
organization_id=organization_id,
token_jti=token_jti,
token_type=token_type,
issued_at=datetime.now(),
expires_at=expires_at,
is_revoked=False
)
db.add(token_record)
db.commit()
Revoking Tokens
async def revoke_token(token_jti: str, reason: str, db: Session):
"""Revoke a token immediately."""
token_record = db.query(CognitoToken).filter(
CognitoToken.token_jti == token_jti
).first()
if token_record:
token_record.is_revoked = True
token_record.revoked_at = datetime.now()
token_record.revocation_reason = reason
db.commit()
Revocation Check
async def check_token_revoked(token_jti: str, db: Session) -> bool:
"""Check if token has been revoked."""
token_record = db.query(CognitoToken).filter(
CognitoToken.token_jti == token_jti
).first()
if token_record and token_record.is_revoked:
return True # Token is revoked
return False # Token is valid
Audit Logging
All authentication events are logged for compliance:
def log_auth_event(
event_type: str,
success: bool,
user_id: Optional[int] = None,
cognito_user_id: Optional[str] = None,
organization_id: Optional[int] = None,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
failure_reason: Optional[str] = None,
metadata: Optional[dict] = None,
db: Session = None
):
"""Log authentication event to auth_audit_log for compliance."""
audit_log = AuthAuditLog(
event_type=event_type,
user_id=user_id,
cognito_user_id=cognito_user_id,
organization_id=organization_id,
ip_address=ip_address,
user_agent=user_agent,
success=success,
failure_reason=failure_reason,
audit_metadata=metadata,
timestamp=datetime.now()
)
db.add(audit_log)
db.commit()
Event Types
| Event Type | Description | Logged Data |
|---|---|---|
cognito_login | User login attempt | email, IP, success, failure_reason |
cognito_logout | User logout | user_id, session_duration |
token_refresh | Token refresh | user_id, old_jti, new_jti |
token_revoked | Token revocation | user_id, jti, reason |
brute_force_blocked | Brute force attempt blocked | IP, email, attempt_count |
Fail-Secure Behavior
| Scenario | Response | HTTP Status |
|---|---|---|
| Missing Authorization header | "Missing or invalid Authorization header" | 401 |
| JWKS fetch fails | "Authentication service temporarily unavailable" | 503 |
| Unknown key ID (after refresh) | "Invalid authentication token" | 401 |
| Invalid signature | "Invalid authentication token" | 401 |
| Invalid claims | "Invalid token claims" | 401 |
| Token expired | "Token has expired" | 401 |
| Token revoked | "Token has been revoked" | 401 |
| Missing organization_id | "Token missing required custom attribute" | 401 |
| Organization not found | "Organization not found" | 403 |
| Organization suspended | "Organization subscription is [status]" | 403 |
| Cognito unavailable | "Authentication service temporarily unavailable" | 503 |
Compliance Mapping
| Framework | Control | Implementation |
|---|---|---|
| SOC 2 | CC6.1 | RS256 signature validation, JWKS rotation |
| HIPAA | 164.312(d) | Person authentication via Cognito |
| PCI-DSS | Req 8.2 | Strong cryptographic authentication |
| PCI-DSS | Req 8.3 | MFA support via Cognito |
| NIST 800-63 | AAL2 | Multi-factor authentication |
| NIST 800-63 | AAL3 | Cryptographic token verification |
Verification
Test Login Flow
# 1. Initiate login
curl -X POST https://api.ascend.io/v1/auth/login \
-H "Content-Type: application/json" \
-d '{
"email": "user@example.com",
"password": "SecurePassword123!"
}'
# Response includes tokens
{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
"id_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJjdHkiOiJKV1QiLCJlbmMiOiJBMjU2R0NNIi...",
"expires_in": 3600
}
Test Token Validation
# Use ID token for API access
curl -X GET https://api.ascend.io/v1/auth/me \
-H "Authorization: Bearer $ID_TOKEN"
# Response
{
"user_id": 123,
"cognito_user_id": "12345678-1234-1234-1234-123456789012",
"email": "user@example.com",
"organization_id": 1,
"organization_name": "Acme Corp",
"role": "admin",
"is_org_admin": true,
"auth_method": "cognito"
}
Test Token Refresh
curl -X POST https://api.ascend.io/v1/auth/refresh \
-H "Content-Type: application/json" \
-d '{
"refresh_token": "eyJjdHkiOiJKV1QiLCJlbmMiOiJBMjU2R0NNIi..."
}'
Next Steps
- JWT Token Management - Detailed JWT validation
- API Key Authentication - SDK authentication
- Authorization Overview - RBAC configuration