Skip to main content

Audit Logging

ASCEND provides comprehensive, immutable audit logging with cryptographic hash-chaining for every action, decision, and system event. Meet compliance requirements and investigate incidents with tamper-proof visibility.

Source: services/immutable_audit_service.py

Immutable Audit Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│ IMMUTABLE AUDIT LOG CHAIN │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Log #1 │ │ Log #2 │ │ Log #3 │ │
│ ├──────────────┤ ├──────────────┤ ├──────────────┤ │
│ │ Seq: 1 │──► │ Seq: 2 │──► │ Seq: 3 │ │
│ │ Content Hash │ │ Content Hash │ │ Content Hash │ │
│ │ Prev: NULL │ │ Prev: hash#1 │ │ Prev: hash#2 │ │
│ │ Chain: ABC │ │ Chain: DEF │ │ Chain: GHI │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ Chain Hash = SHA256(Content Hash + Previous Chain Hash) │
│ Tampering Detection: Recalculate chain, compare with stored hashes │
│ │
└─────────────────────────────────────────────────────────────────────────┘

Core Logging Service

Source: services/immutable_audit_service.py:16-187

The ImmutableAuditService Class

# Source: immutable_audit_service.py:16-20
class ImmutableAuditService:
"""Enterprise-grade immutable audit logging service"""

def __init__(self, db: Session):
self.db = db

Creating Audit Log Entries

Source: immutable_audit_service.py:22-80

The log_event Method

# Source: immutable_audit_service.py:22-75
def log_event(
self,
event_type: str,
actor_id: str,
resource_type: str,
resource_id: str,
action: str,
event_data: Dict[str, Any],
risk_level: str = "MEDIUM",
compliance_tags: List[str] = None,
ip_address: str = None,
user_agent: str = None,
session_id: str = None
) -> ImmutableAuditLog:
"""Create immutable audit log entry with hash-chaining"""
try:
# Get the last audit log for hash chaining
last_log = self.db.query(ImmutableAuditLog).order_by(
desc(ImmutableAuditLog.sequence_number)
).first()

# Create new audit log
audit_log = ImmutableAuditLog(
event_type=event_type,
actor_id=actor_id,
resource_type=resource_type,
resource_id=resource_id,
action=action,
event_data=event_data,
risk_level=risk_level,
compliance_tags=compliance_tags or [],
ip_address=ip_address,
user_agent=user_agent,
session_id=session_id
)

# Calculate content hash
audit_log.content_hash = audit_log.calculate_content_hash()

# Set previous hash and calculate chain hash
previous_hash = last_log.chain_hash if last_log else None
audit_log.previous_hash = previous_hash
audit_log.chain_hash = audit_log.calculate_chain_hash(previous_hash)

# Set retention based on compliance tags
audit_log.retention_until = self._calculate_retention_date(compliance_tags)

# Save to database
self.db.add(audit_log)
self.db.commit()
self.db.refresh(audit_log)

logger.info(f"Immutable audit log created: {audit_log.id}")
return audit_log

except Exception as e:
self.db.rollback()
logger.error(f"Failed to create audit log: {str(e)}")
raise

Event Parameters

ParameterTypeRequiredDescription
event_typestringYesType of event (e.g., "action.submitted", "user.login")
actor_idstringYesID of user/agent performing action
resource_typestringYesType of resource (e.g., "database", "api_key")
resource_idstringYesSpecific resource identifier
actionstringYesAction performed (e.g., "read", "write", "delete")
event_dataDictYesDetailed event data (stored as JSON)
risk_levelstringNoRisk classification (default: "MEDIUM")
compliance_tagsList[str]NoCompliance frameworks (e.g., ["SOX", "HIPAA"])
ip_addressstringNoSource IP address
user_agentstringNoBrowser/client user agent
session_idstringNoSession identifier

Hash-Chaining Mechanism

Content Hash Calculation

Source: models_audit.py (ImmutableAuditLog model)

def calculate_content_hash(self) -> str:
"""Calculate SHA-256 hash of entry content"""
content = {
"event_type": self.event_type,
"actor_id": self.actor_id,
"resource_type": self.resource_type,
"resource_id": self.resource_id,
"action": self.action,
"event_data": self.event_data,
"timestamp": self.timestamp.isoformat(),
"risk_level": self.risk_level
}
content_json = json.dumps(content, sort_keys=True)
return hashlib.sha256(content_json.encode()).hexdigest()

Chain Hash Calculation

def calculate_chain_hash(self, previous_hash: str = None) -> str:
"""Calculate chained hash linking to previous entry"""
chain_data = {
"content_hash": self.content_hash,
"previous_hash": previous_hash or "genesis"
}
chain_json = json.dumps(chain_data, sort_keys=True)
return hashlib.sha256(chain_json.encode()).hexdigest()

WORM (Write-Once-Read-Many) Guarantee:

  • Each entry is cryptographically linked to the previous entry
  • Any modification breaks the chain
  • Tampering is immediately detectable

Chain Integrity Verification

Source: immutable_audit_service.py:82-163

The verify_chain_integrity Method

# Source: immutable_audit_service.py:82-163
def verify_chain_integrity(
self,
start_sequence: int = None,
end_sequence: int = None
) -> AuditIntegrityCheck:
"""Verify the integrity of the audit log hash chain"""
start_time = datetime.now(UTC)

# Get sequence range if not specified
if start_sequence is None:
start_sequence = self.db.query(func.min(ImmutableAuditLog.sequence_number)).scalar() or 1
if end_sequence is None:
end_sequence = self.db.query(func.max(ImmutableAuditLog.sequence_number)).scalar() or 1

# Fetch audit logs in sequence order
logs = self.db.query(ImmutableAuditLog).filter(
and_(
ImmutableAuditLog.sequence_number >= start_sequence,
ImmutableAuditLog.sequence_number <= end_sequence
)
).order_by(ImmutableAuditLog.sequence_number).all()

broken_chains = []
invalid_hashes = []
previous_hash = None

for log in logs:
# Verify content hash
expected_content_hash = log.calculate_content_hash()
if log.content_hash != expected_content_hash:
invalid_hashes.append({
'sequence': log.sequence_number,
'id': str(log.id),
'expected': expected_content_hash,
'actual': log.content_hash
})

# Verify chain hash
expected_chain_hash = log.calculate_chain_hash(previous_hash)
if log.chain_hash != expected_chain_hash:
broken_chains.append({
'sequence': log.sequence_number,
'id': str(log.id),
'expected': expected_chain_hash,
'actual': log.chain_hash
})

previous_hash = log.chain_hash

# Determine overall status
if broken_chains or invalid_hashes:
status = "TAMPERED" if invalid_hashes else "BROKEN"
else:
status = "VALID"

# Calculate performance metrics
end_time = datetime.now(UTC)
duration_ms = int((end_time - start_time).total_seconds() * 1000)
records_per_second = int(len(logs) / max((end_time - start_time).total_seconds(), 0.001))

# Create integrity check record
integrity_check = AuditIntegrityCheck(
start_sequence=start_sequence,
end_sequence=end_sequence,
total_records=len(logs),
status=status,
broken_chains=broken_chains if broken_chains else None,
invalid_hashes=invalid_hashes if invalid_hashes else None,
check_duration_ms=duration_ms,
records_per_second=records_per_second,
details={
'check_range': f"{start_sequence}-{end_sequence}",
'chain_breaks': len(broken_chains),
'hash_failures': len(invalid_hashes)
}
)

self.db.add(integrity_check)
self.db.commit()

logger.info(f"Chain integrity check completed: {status} ({len(logs)} records)")
return integrity_check

Integrity Check Statuses

StatusMeaningAction
VALIDAll hashes verify correctlyNormal operation
BROKENChain links broken (data moved/reordered)Investigate data migration
TAMPEREDContent hashes don't match (data modified)Security incident - data tampering detected

Compliance Retention

Source: immutable_audit_service.py:165-186

The _calculate_retention_date Method

# Source: immutable_audit_service.py:165-186
def _calculate_retention_date(self, compliance_tags: List[str]) -> datetime:
"""Calculate retention date based on compliance requirements"""
if not compliance_tags:
return datetime.now(UTC) + timedelta(days=2555) # 7 years default

# Compliance retention periods
retention_periods = {
'SOX': timedelta(days=2555), # 7 years
'HIPAA': timedelta(days=2190), # 6 years
'PCI': timedelta(days=365), # 1 year
'GDPR': timedelta(days=2190), # 6 years
'CCPA': timedelta(days=1095), # 3 years
'FERPA': timedelta(days=1825), # 5 years
}

# Use the longest retention period from applicable frameworks
max_retention = timedelta(days=2555) # Default 7 years
for tag in compliance_tags:
if tag in retention_periods:
max_retention = max(max_retention, retention_periods[tag])

return datetime.now(UTC) + max_retention

Compliance Retention Periods

FrameworkRetention PeriodDaysRationale
SOX (Sarbanes-Oxley)7 years2555Financial records retention
HIPAA6 years2190Healthcare records retention
PCI-DSS1 year365Payment card data audit logs
GDPR6 years2190EU data protection (can be longer)
CCPA3 years1095California Consumer Privacy Act
FERPA5 years1825Educational records
Default7 years2555Conservative enterprise default

Automatic Selection: The system uses the longest applicable retention period when multiple compliance tags are specified.

Example Usage

Basic Event Logging

from services.immutable_audit_service import ImmutableAuditService

# Initialize service
audit_service = ImmutableAuditService(db)

# Log an agent action
audit_log = audit_service.log_event(
event_type="action.submitted",
actor_id="agent_customer_service",
resource_type="customer_database",
resource_id="db_customers",
action="read",
event_data={
"query": "SELECT * FROM customers WHERE id = 12345",
"risk_score": 45,
"approved": True,
"approver": "manager@company.com"
},
risk_level="MEDIUM",
compliance_tags=["SOX", "HIPAA"],
ip_address="10.0.1.50",
user_agent="AscendSDK/1.0.0 Python/3.11",
session_id="sess_abc123"
)

print(f"Audit log created: {audit_log.id}")
print(f"Sequence number: {audit_log.sequence_number}")
print(f"Content hash: {audit_log.content_hash}")
print(f"Chain hash: {audit_log.chain_hash}")
print(f"Retention until: {audit_log.retention_until}")

Verifying Chain Integrity

# Verify entire audit trail
integrity_check = audit_service.verify_chain_integrity()

print(f"Status: {integrity_check.status}")
print(f"Total records: {integrity_check.total_records}")
print(f"Duration: {integrity_check.check_duration_ms}ms")
print(f"Records/second: {integrity_check.records_per_second}")

if integrity_check.status == "TAMPERED":
print("ALERT: Tampering detected!")
print(f"Invalid hashes: {integrity_check.invalid_hashes}")
elif integrity_check.status == "BROKEN":
print("WARNING: Chain broken (data migration?)")
print(f"Broken chains: {integrity_check.broken_chains}")
else:
print("Chain integrity verified: VALID")

Verifying Specific Range

# Verify specific sequence range
integrity_check = audit_service.verify_chain_integrity(
start_sequence=1000,
end_sequence=2000
)

print(f"Checked sequences 1000-2000: {integrity_check.status}")

Database Schema

Source: models_audit.py

ImmutableAuditLog Model

class ImmutableAuditLog(Base):
__tablename__ = "immutable_audit_logs"

id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
sequence_number = Column(Integer, unique=True, nullable=False, index=True) # Auto-incrementing
organization_id = Column(Integer, ForeignKey("organizations.id"), nullable=False, index=True)

# Event identification
event_type = Column(String(100), nullable=False, index=True)
actor_id = Column(String(255), nullable=False, index=True)
resource_type = Column(String(100), nullable=False)
resource_id = Column(String(255), nullable=False)
action = Column(String(100), nullable=False)

# Event data
event_data = Column(JSONB, nullable=False)
risk_level = Column(String(20), nullable=False, index=True)
compliance_tags = Column(postgresql.ARRAY(String), nullable=True)

# Cryptographic integrity
content_hash = Column(String(64), nullable=False) # SHA-256
previous_hash = Column(String(64), nullable=True)
chain_hash = Column(String(64), nullable=False, unique=True)

# Metadata
timestamp = Column(DateTime(timezone=True), default=datetime.now(UTC), nullable=False, index=True)
ip_address = Column(String(45), nullable=True)
user_agent = Column(Text, nullable=True)
session_id = Column(String(255), nullable=True, index=True)

# Retention
retention_until = Column(DateTime(timezone=True), nullable=False)
is_archived = Column(Boolean, default=False)

AuditIntegrityCheck Model

class AuditIntegrityCheck(Base):
__tablename__ = "audit_integrity_checks"

id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
start_sequence = Column(Integer, nullable=False)
end_sequence = Column(Integer, nullable=False)
total_records = Column(Integer, nullable=False)

# Check results
status = Column(String(20), nullable=False) # VALID, BROKEN, TAMPERED
broken_chains = Column(JSONB, nullable=True)
invalid_hashes = Column(JSONB, nullable=True)

# Performance metrics
check_duration_ms = Column(Integer, nullable=False)
records_per_second = Column(Integer, nullable=False)

# Metadata
checked_at = Column(DateTime(timezone=True), default=datetime.now(UTC), nullable=False)
details = Column(JSONB, nullable=True)

Event Types

Standard Event Types

Event TypeDescriptionExample
action.submittedAgent action submittedAgent requests database access
action.approvedAction approved by reviewerManager approves action
action.deniedAction deniedAction blocked by policy
action.executedAction executedDatabase query executed
user.loginUser logged inUser authenticated
user.logoutUser logged outUser session ended
user.createdUser account createdNew user onboarded
policy.createdGovernance policy createdNew policy added
policy.updatedGovernance policy modifiedPolicy rules changed
workflow.startedApproval workflow startedWorkflow triggered
workflow.completedApproval workflow completedWorkflow finished
apikey.createdAPI key generatedNew API key created
apikey.revokedAPI key revokedAPI key deactivated

Risk Levels

Risk LevelUsageCompliance Impact
CRITICALCritical security eventsImmediate alert, extended retention
HIGHHigh-risk actionsPriority review
MEDIUMStandard operationsNormal processing
LOWLow-risk activitiesBackground logging
INFOInformational eventsGeneral tracking

Performance Characteristics

Source: Based on verify_chain_integrity performance metrics

Typical Performance

  • Write Performance: 1,000+ entries/second (single-threaded)
  • Verification Performance: 5,000+ records/second (Python implementation)
  • Storage: ~500 bytes per entry (average)

Scalability

  • Sequential numbering with database sequence
  • Indexed by organization_id for multi-tenant performance
  • Hash calculation is CPU-bound (can be optimized with C extensions)

Best Practices

1. Include Comprehensive Event Data

# Good: Rich event data
audit_service.log_event(
event_type="action.submitted",
actor_id="agent_123",
resource_type="database",
resource_id="prod_db",
action="write",
event_data={
"query": "UPDATE customers SET email = ? WHERE id = ?",
"parameters": ["***masked***", "12345"],
"rows_affected": 1,
"risk_score": 75,
"workflow_id": "wf_456",
"business_justification": "Customer email update request",
"ticket_id": "JIRA-789"
},
risk_level="HIGH",
compliance_tags=["SOX", "GDPR"]
)

# Bad: Minimal event data
audit_service.log_event(
event_type="action",
actor_id="agent",
resource_type="db",
resource_id="db1",
action="write",
event_data={"query": "UPDATE customers"}
)

2. Tag with Compliance Frameworks

# Ensure proper retention by tagging compliance frameworks
audit_service.log_event(
# ... other parameters ...
compliance_tags=["SOX", "HIPAA", "GDPR"] # 7 years retention (SOX)
)

3. Regular Integrity Verification

# Schedule daily integrity checks
def daily_audit_check():
yesterday = datetime.now(UTC) - timedelta(days=1)
start_seq = get_first_sequence_of_day(yesterday)
end_seq = get_last_sequence_of_day(yesterday)

check = audit_service.verify_chain_integrity(start_seq, end_seq)

if check.status != "VALID":
alert_security_team(check)

# Run via cron/scheduler
schedule.every().day.at("00:00").do(daily_audit_check)

4. Mask Sensitive Data

# Don't log sensitive data in plain text
audit_service.log_event(
event_type="user.login",
actor_id=user_id,
resource_type="authentication",
resource_id="login_endpoint",
action="login",
event_data={
"email": user_email,
"ip_address": ip_address,
"password": "***REDACTED***", # NEVER log passwords
"mfa_code": "***REDACTED***", # NEVER log MFA codes
"success": True
}
)

Compliance Mapping

RequirementStandardImplementation
Audit TrailSOC 2 AU-6Hash-chained immutable logs
IntegritySOC 2 PI-1Cryptographic verification
RetentionSOX7 years automatic retention
RetentionHIPAA6 years for healthcare
Log ReviewPCI-DSS 10.6Integrity verification
Tamper DetectionNIST AU-7Chain verification algorithm

Next Steps