Skip to main content

AWS Cognito Integration

Overview

ASCEND uses AWS Cognito as the primary identity provider for interactive users. The integration implements enterprise-grade security with RS256 JWT validation, automatic JWKS key rotation, multi-pool support for multi-tenant deployments, and complete audit logging.

Why It Matters

AWS Cognito provides:

  • Managed Identity Service: Reduces operational burden and security risk of self-hosted identity
  • Enterprise Features: MFA, SAML/OIDC federation, advanced security features
  • Compliance Ready: SOC 2, HIPAA, PCI-DSS certifications from AWS
  • Scalability: Supports millions of users without infrastructure changes
  • Security: RSA key management, automatic rotation, DDoS protection

Architecture

Authentication Flow

+---------------+                    +----------------+
| Web Client | | AWS Cognito |
| (Browser) | | User Pool |
+-------+-------+ +-------+--------+
| |
| 1. Login Request |
+----------------------------------->|
| |
| 2. Challenge (Password/MFA) |
|<-----------------------------------+
| |
| 3. Challenge Response |
+----------------------------------->|
| |
| 4. JWT Tokens (ID, Access, Refresh)|
|<-----------------------------------+
| |
+-------+-------+ +-------+--------+
| Web Client | | ASCEND API |
+-------+-------+ +-------+--------+
| |
| 5. API Request + JWT |
+----------------------------------->|
| |
| +-------------------------+
| | 6. Fetch JWKS |
| | (Cached) |
| +-------+--------+--------+
| | |
| v |
| +-------+--------+
| | 7. Validate |
| | Signature |
| +-------+--------+
| |
| +-------+--------+
| | 8. Validate |
| | Claims |
| +-------+--------+
| |
| +-------+--------+
| | 9. Check |
| | Revocation |
| +-------+--------+
| |
| +-------+--------+
| | 10. Set RLS |
| | Context |
| +----------------+
| |
| 11. API Response |
|<-----------------------------------+

Multi-Pool Support (RBAC-001)

ASCEND supports two-pool JWT validation for enterprise multi-tenant deployments:

Pool TypePurposeToken ClaimsUse Case
Platform PoolASCEND internal staffscope='platform', org_id=nullPlatform operations
Per-Org PoolsTenant usersscope='org', org_id=[tenant_id]Tenant access
# Enable two-pool authentication
ENABLE_TWO_POOL_AUTH=true

# Platform pool (managed by ASCEND)
COGNITO_REGION=us-east-2
COGNITO_USER_POOL_ID=us-east-2_PlatformPool
COGNITO_APP_CLIENT_ID=platform_client_id

# Per-org pools stored in organizations table
# organizations.cognito_user_pool_id
# organizations.cognito_app_client_id

Configuration

Environment Variables

# Required - AWS Cognito Configuration
COGNITO_REGION=us-east-2
COGNITO_USER_POOL_ID=us-east-2_xxxxxxxxx
COGNITO_APP_CLIENT_ID=xxxxxxxxxxxxxxxxxxxxxxxxxx

# Optional - Multi-Pool Support
ENABLE_TWO_POOL_AUTH=false # Set true for multi-tenant deployment

# Optional - Security Settings
MAX_LOGIN_ATTEMPTS_PER_IP=5
MAX_LOGIN_ATTEMPTS_PER_EMAIL=10
LOGIN_ATTEMPT_WINDOW_MINUTES=15

Cognito User Pool Configuration

Required Settings in AWS Console:

{
"UserPoolName": "ascend-production",
"Policies": {
"PasswordPolicy": {
"MinimumLength": 12,
"RequireUppercase": true,
"RequireLowercase": true,
"RequireNumbers": true,
"RequireSymbols": true,
"TemporaryPasswordValidityDays": 1
}
},
"MfaConfiguration": "OPTIONAL", // or "ON" for required MFA
"UserPoolTags": {
"Environment": "production",
"Compliance": "soc2-hipaa-pci"
}
}

Required Custom Attributes:

AttributeTypeRequiredPurpose
custom:organization_idNumberYesTenant isolation
custom:organization_slugStringYesURL-safe org identifier
custom:roleStringYesRBAC role
custom:is_org_adminStringNoOrganization admin flag

App Client Configuration

{
"ClientName": "ascend-web-client",
"GenerateSecret": false,
"RefreshTokenValidity": 30,
"AccessTokenValidity": 1,
"IdTokenValidity": 1,
"TokenValidityUnits": {
"AccessToken": "hours",
"IdToken": "hours",
"RefreshToken": "days"
},
"ExplicitAuthFlows": [
"ALLOW_USER_PASSWORD_AUTH",
"ALLOW_REFRESH_TOKEN_AUTH",
"ALLOW_USER_SRP_AUTH"
],
"PreventUserExistenceErrors": "ENABLED"
}

JWT Token Structure

ID Token Claims

{
"sub": "12345678-1234-1234-1234-123456789012",
"aud": "app_client_id",
"email_verified": true,
"token_use": "id",
"auth_time": 1705766400,
"iss": "https://cognito-idp.us-east-2.amazonaws.com/us-east-2_xxxxxxxxx",
"cognito:username": "user@example.com",
"exp": 1705770000,
"iat": 1705766400,
"jti": "unique-token-id",
"email": "user@example.com",
"custom:organization_id": "123",
"custom:organization_slug": "acme-corp",
"custom:role": "admin",
"custom:is_org_admin": "true"
}

Validation Checks

CheckValueFailure Response
AlgorithmRS256401 - Invalid token algorithm
SignatureJWKS public key401 - Invalid signature
Issuer (iss)Cognito pool URL401 - Invalid issuer
Audience (aud)App client ID401 - Invalid audience
Expiration (exp)Current timestamp401 - Token expired
Token Use"id"401 - Invalid token use
Organization IDPresent and valid403 - Invalid organization

JWKS Key Management

Automatic Key Rotation

@lru_cache(maxsize=1)
def get_cognito_public_keys() -> Dict[str, RSAKey]:
"""
Fetch and cache AWS Cognito public keys for JWT signature validation.

Keys are cached to reduce latency. Cache is automatically refreshed
when a signature validation fails with an unknown key ID.
"""
response = requests.get(COGNITO_JWKS_URL, timeout=10)
keys = response.json()["keys"]

public_keys = {}
for key_data in keys:
kid = key_data["kid"]
public_keys[kid] = jwk.construct(key_data)

return public_keys

Key Refresh Strategy

ScenarioAction
Normal operationUse cached keys
Unknown key IDRefresh cache, retry validation
Refresh failsReturn 503 (fail-secure)
Key rotation by CognitoAutomatic during next refresh

Brute Force Protection

Protection Layers

# IP-based protection
MAX_LOGIN_ATTEMPTS_PER_IP = 5 # Block after 5 failed attempts from same IP
LOGIN_ATTEMPT_WINDOW_MINUTES = 15

# Email-based protection
MAX_LOGIN_ATTEMPTS_PER_EMAIL = 10 # Block after 10 failed attempts for same email

Implementation

async def check_brute_force(email: str, ip_address: str, db: Session) -> bool:
"""
Check if login should be blocked due to brute force attempts.

Returns True if should block, False if allowed.
"""
cutoff_time = datetime.now() - timedelta(minutes=LOGIN_ATTEMPT_WINDOW_MINUTES)

# Check IP-based attempts
ip_attempts = db.query(LoginAttempt).filter(
LoginAttempt.ip_address == ip_address,
LoginAttempt.success == False,
LoginAttempt.attempted_at >= cutoff_time
).count()

if ip_attempts >= MAX_LOGIN_ATTEMPTS_PER_IP:
return True # Block

# Check email-based attempts
email_attempts = db.query(LoginAttempt).filter(
LoginAttempt.email == email.lower(),
LoginAttempt.success == False,
LoginAttempt.attempted_at >= cutoff_time
).count()

if email_attempts >= MAX_LOGIN_ATTEMPTS_PER_EMAIL:
return True # Block

return False # Allow

Token Revocation

Tracking Tokens

async def track_token(
token_jti: str,
user_id: int,
cognito_user_id: str,
organization_id: int,
token_type: str,
expires_at: datetime,
db: Session
):
"""Track issued token for revocation support."""
token_record = CognitoToken(
user_id=user_id,
cognito_user_id=cognito_user_id,
organization_id=organization_id,
token_jti=token_jti,
token_type=token_type,
issued_at=datetime.now(),
expires_at=expires_at,
is_revoked=False
)
db.add(token_record)
db.commit()

Revoking Tokens

async def revoke_token(token_jti: str, reason: str, db: Session):
"""Revoke a token immediately."""
token_record = db.query(CognitoToken).filter(
CognitoToken.token_jti == token_jti
).first()

if token_record:
token_record.is_revoked = True
token_record.revoked_at = datetime.now()
token_record.revocation_reason = reason
db.commit()

Revocation Check

async def check_token_revoked(token_jti: str, db: Session) -> bool:
"""Check if token has been revoked."""
token_record = db.query(CognitoToken).filter(
CognitoToken.token_jti == token_jti
).first()

if token_record and token_record.is_revoked:
return True # Token is revoked

return False # Token is valid

Audit Logging

All authentication events are logged for compliance:

def log_auth_event(
event_type: str,
success: bool,
user_id: Optional[int] = None,
cognito_user_id: Optional[str] = None,
organization_id: Optional[int] = None,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
failure_reason: Optional[str] = None,
metadata: Optional[dict] = None,
db: Session = None
):
"""Log authentication event to auth_audit_log for compliance."""
audit_log = AuthAuditLog(
event_type=event_type,
user_id=user_id,
cognito_user_id=cognito_user_id,
organization_id=organization_id,
ip_address=ip_address,
user_agent=user_agent,
success=success,
failure_reason=failure_reason,
audit_metadata=metadata,
timestamp=datetime.now()
)
db.add(audit_log)
db.commit()

Event Types

Event TypeDescriptionLogged Data
cognito_loginUser login attemptemail, IP, success, failure_reason
cognito_logoutUser logoutuser_id, session_duration
token_refreshToken refreshuser_id, old_jti, new_jti
token_revokedToken revocationuser_id, jti, reason
brute_force_blockedBrute force attempt blockedIP, email, attempt_count

Fail-Secure Behavior

ScenarioResponseHTTP Status
Missing Authorization header"Missing or invalid Authorization header"401
JWKS fetch fails"Authentication service temporarily unavailable"503
Unknown key ID (after refresh)"Invalid authentication token"401
Invalid signature"Invalid authentication token"401
Invalid claims"Invalid token claims"401
Token expired"Token has expired"401
Token revoked"Token has been revoked"401
Missing organization_id"Token missing required custom attribute"401
Organization not found"Organization not found"403
Organization suspended"Organization subscription is [status]"403
Cognito unavailable"Authentication service temporarily unavailable"503

Compliance Mapping

FrameworkControlImplementation
SOC 2CC6.1RS256 signature validation, JWKS rotation
HIPAA164.312(d)Person authentication via Cognito
PCI-DSSReq 8.2Strong cryptographic authentication
PCI-DSSReq 8.3MFA support via Cognito
NIST 800-63AAL2Multi-factor authentication
NIST 800-63AAL3Cryptographic token verification

Verification

Test Login Flow

# 1. Initiate login
curl -X POST https://api.ascend.io/v1/auth/login \
-H "Content-Type: application/json" \
-d '{
"email": "user@example.com",
"password": "SecurePassword123!"
}'

# Response includes tokens
{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
"id_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJjdHkiOiJKV1QiLCJlbmMiOiJBMjU2R0NNIi...",
"expires_in": 3600
}

Test Token Validation

# Use ID token for API access
curl -X GET https://api.ascend.io/v1/auth/me \
-H "Authorization: Bearer $ID_TOKEN"

# Response
{
"user_id": 123,
"cognito_user_id": "12345678-1234-1234-1234-123456789012",
"email": "user@example.com",
"organization_id": 1,
"organization_name": "Acme Corp",
"role": "admin",
"is_org_admin": true,
"auth_method": "cognito"
}

Test Token Refresh

curl -X POST https://api.ascend.io/v1/auth/refresh \
-H "Content-Type: application/json" \
-d '{
"refresh_token": "eyJjdHkiOiJKV1QiLCJlbmMiOiJBMjU2R0NNIi..."
}'

Next Steps