Security & Compliance
Introduction to Agent Security
Security is critical for production agent systems. This section covers authentication, authorization, data protection, and compliance requirements.
Security Principles
Defense in Depth: Multiple layers of security Least Privilege: Minimum necessary access Zero Trust: Verify everything Encryption: Protect data at rest and in transit Audit Everything: Complete logging
Threat Model
Threats:
- Unauthorized access
- Data breaches
- Prompt injection
- Model manipulation
- Resource exhaustion
- Privacy violations
Authentication and Authorization
JWT-Based Authentication
import jwt
from datetime import datetime, timedelta
from fastapi import HTTPException, Security, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional, Dict
class AuthManager:
"""JWT-based authentication"""
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
self.security = HTTPBearer()
def create_token(self,
user_id: str,
roles: List[str],
expires_in: int = 3600) -> str:
"""Create JWT token"""
payload = {
"user_id": user_id,
"roles": roles,
"exp": datetime.utcnow() + timedelta(seconds=expires_in),
"iat": datetime.utcnow()
}
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
return token
def verify_token(self, token: str) -> Dict:
"""Verify and decode JWT token"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm]
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
async def get_current_user(self,
credentials: HTTPAuthorizationCredentials = Security(HTTPBearer())):
"""Get current user from token"""
token = credentials.credentials
payload = self.verify_token(token)
return {
"user_id": payload["user_id"],
"roles": payload["roles"]
}
# Role-Based Access Control
class RBACManager:
"""Role-based access control"""
def __init__(self):
self.permissions = {
"admin": ["read", "write", "delete", "admin"],
"user": ["read", "write"],
"viewer": ["read"]
}
def has_permission(self, roles: List[str], required_permission: str) -> bool:
"""Check if roles have required permission"""
for role in roles:
if role in self.permissions:
if required_permission in self.permissions[role]:
return True
return False
def require_permission(self, permission: str):
"""Decorator to require permission"""
def decorator(func):
async def wrapper(*args, **kwargs):
# Get user from context
user = kwargs.get('current_user')
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
if not self.has_permission(user['roles'], permission):
raise HTTPException(status_code=403, detail="Insufficient permissions")
return await func(*args, **kwargs)
return wrapper
return decorator
# Secure Agent API
class SecureAgentAPI:
"""Agent API with authentication"""
def __init__(self):
self.app = FastAPI()
self.auth = AuthManager(secret_key="your-secret-key")
self.rbac = RBACManager()
self.setup_routes()
def setup_routes(self):
"""Setup secure routes"""
@self.app.post("/auth/login")
async def login(credentials: LoginRequest):
"""Login and get token"""
# Verify credentials (simplified)
if self.verify_credentials(credentials.username, credentials.password):
token = self.auth.create_token(
user_id=credentials.username,
roles=["user"]
)
return {"token": token}
else:
raise HTTPException(status_code=401, detail="Invalid credentials")
@self.app.post("/agent/process")
async def process(
request: AgentRequest,
current_user: Dict = Depends(self.auth.get_current_user)
):
"""Process request (requires authentication)"""
# Check permission
if not self.rbac.has_permission(current_user['roles'], 'write'):
raise HTTPException(status_code=403, detail="Insufficient permissions")
# Process request
result = await self.process_request(request, current_user)
return {"result": result}
def verify_credentials(self, username: str, password: str) -> bool:
"""Verify user credentials"""
# In production, check against database with hashed passwords
return True
class LoginRequest(BaseModel):
username: str
password: str
# Usage
api = SecureAgentAPI()
API Key Management
import secrets
import hashlib
from datetime import datetime
class APIKeyManager:
"""Manage API keys"""
def __init__(self):
self.keys = {} # In production, use database
def generate_key(self, user_id: str, name: str) -> str:
"""Generate new API key"""
# Generate secure random key
key = f"sk_{secrets.token_urlsafe(32)}"
# Hash for storage
key_hash = hashlib.sha256(key.encode()).hexdigest()
# Store
self.keys[key_hash] = {
"user_id": user_id,
"name": name,
"created_at": datetime.utcnow(),
"last_used": None,
"usage_count": 0
}
return key
def verify_key(self, key: str) -> Optional[Dict]:
"""Verify API key"""
key_hash = hashlib.sha256(key.encode()).hexdigest()
if key_hash in self.keys:
# Update usage
self.keys[key_hash]["last_used"] = datetime.utcnow()
self.keys[key_hash]["usage_count"] += 1
return self.keys[key_hash]
return None
def revoke_key(self, key: str):
"""Revoke API key"""
key_hash = hashlib.sha256(key.encode()).hexdigest()
if key_hash in self.keys:
del self.keys[key_hash]
return True
return False
# API Key Authentication
from fastapi.security import APIKeyHeader
class APIKeyAuth:
"""API Key authentication"""
def __init__(self, key_manager: APIKeyManager):
self.key_manager = key_manager
self.api_key_header = APIKeyHeader(name="X-API-Key")
async def verify(self, api_key: str = Security(APIKeyHeader(name="X-API-Key"))):
"""Verify API key"""
key_data = self.key_manager.verify_key(api_key)
if not key_data:
raise HTTPException(status_code=401, detail="Invalid API key")
return key_data
# Usage
key_manager = APIKeyManager()
api_key = key_manager.generate_key("user123", "Production Key")
print(f"API Key: {api_key}")
Data Encryption
Encryption at Rest
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import base64
class DataEncryption:
"""Encrypt sensitive data"""
def __init__(self, password: str):
self.key = self.derive_key(password)
self.cipher = Fernet(self.key)
def derive_key(self, password: str) -> bytes:
"""Derive encryption key from password"""
kdf = PBKDF2(
algorithm=hashes.SHA256(),
length=32,
salt=b'static_salt', # In production, use random salt
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key
def encrypt(self, data: str) -> str:
"""Encrypt data"""
encrypted = self.cipher.encrypt(data.encode())
return base64.urlsafe_b64encode(encrypted).decode()
def decrypt(self, encrypted_data: str) -> str:
"""Decrypt data"""
encrypted = base64.urlsafe_b64decode(encrypted_data.encode())
decrypted = self.cipher.decrypt(encrypted)
return decrypted.decode()
# Encrypted Storage
class EncryptedStorage:
"""Store data with encryption"""
def __init__(self, encryption_key: str):
self.encryption = DataEncryption(encryption_key)
self.storage = {}
def store(self, key: str, value: str):
"""Store encrypted data"""
encrypted_value = self.encryption.encrypt(value)
self.storage[key] = encrypted_value
def retrieve(self, key: str) -> Optional[str]:
"""Retrieve and decrypt data"""
encrypted_value = self.storage.get(key)
if encrypted_value:
return self.encryption.decrypt(encrypted_value)
return None
# Usage
storage = EncryptedStorage("my-secret-password")
storage.store("api_key", "sk_1234567890")
retrieved = storage.retrieve("api_key")
print(f"Retrieved: {retrieved}")
Encryption in Transit (TLS/SSL)
import ssl
from fastapi import FastAPI
import uvicorn
class SecureServer:
"""HTTPS server with TLS"""
def __init__(self):
self.app = FastAPI()
self.setup_routes()
def setup_routes(self):
"""Setup routes"""
@self.app.get("/")
async def root():
return {"message": "Secure server"}
def run(self,
host: str = "0.0.0.0",
port: int = 443,
cert_file: str = "cert.pem",
key_file: str = "key.pem"):
"""Run with TLS"""
uvicorn.run(
self.app,
host=host,
port=port,
ssl_keyfile=key_file,
ssl_certfile=cert_file,
ssl_version=ssl.PROTOCOL_TLS,
ssl_cert_reqs=ssl.CERT_REQUIRED
)
# Generate self-signed certificate (for development only)
def generate_self_signed_cert():
"""Generate self-signed certificate"""
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
# Generate private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
# Generate certificate
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Agent System"),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(days=365)
).sign(private_key, hashes.SHA256())
return private_key, cert
Audit Logging
Comprehensive Audit System
import logging
from datetime import datetime
from typing import Optional
import json
class AuditLogger:
"""Audit logging system"""
def __init__(self, log_file: str = "audit.log"):
self.logger = logging.getLogger("audit")
self.logger.setLevel(logging.INFO)
# File handler
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def log_event(self,
event_type: str,
user_id: str,
action: str,
resource: str,
result: str,
metadata: Optional[Dict] = None):
"""Log audit event"""
event = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"user_id": user_id,
"action": action,
"resource": resource,
"result": result,
"metadata": metadata or {},
"ip_address": self.get_client_ip()
}
self.logger.info(json.dumps(event))
def log_access(self, user_id: str, resource: str, granted: bool):
"""Log access attempt"""
self.log_event(
event_type="access",
user_id=user_id,
action="access",
resource=resource,
result="granted" if granted else "denied"
)
def log_data_access(self, user_id: str, data_type: str, operation: str):
"""Log data access"""
self.log_event(
event_type="data_access",
user_id=user_id,
action=operation,
resource=data_type,
result="success"
)
def log_security_event(self, user_id: str, event: str, severity: str):
"""Log security event"""
self.log_event(
event_type="security",
user_id=user_id,
action=event,
resource="system",
result=severity,
metadata={"severity": severity}
)
def get_client_ip(self) -> str:
"""Get client IP address"""
# In production, extract from request
return "0.0.0.0"
# Audit Middleware
class AuditMiddleware:
"""Middleware for automatic audit logging"""
def __init__(self, audit_logger: AuditLogger):
self.audit_logger = audit_logger
async def __call__(self, request, call_next):
"""Process request with audit logging"""
# Log request
user_id = request.state.user_id if hasattr(request.state, 'user_id') else "anonymous"
self.audit_logger.log_event(
event_type="api_request",
user_id=user_id,
action=request.method,
resource=request.url.path,
result="started"
)
# Process request
try:
response = await call_next(request)
# Log success
self.audit_logger.log_event(
event_type="api_request",
user_id=user_id,
action=request.method,
resource=request.url.path,
result="success",
metadata={"status_code": response.status_code}
)
return response
except Exception as e:
# Log failure
self.audit_logger.log_event(
event_type="api_request",
user_id=user_id,
action=request.method,
resource=request.url.path,
result="error",
metadata={"error": str(e)}
)
raise
# Usage
audit_logger = AuditLogger()
audit_logger.log_access("user123", "/agent/process", granted=True)
audit_logger.log_security_event("user456", "failed_login", "warning")
Regulatory Considerations
GDPR Compliance
class GDPRCompliance:
"""GDPR compliance features"""
def __init__(self):
self.data_store = {}
self.consent_records = {}
self.audit_logger = AuditLogger()
def collect_consent(self, user_id: str, purposes: List[str]) -> bool:
"""Collect user consent"""
self.consent_records[user_id] = {
"purposes": purposes,
"timestamp": datetime.utcnow(),
"version": "1.0"
}
self.audit_logger.log_event(
event_type="consent",
user_id=user_id,
action="collect",
resource="consent",
result="success",
metadata={"purposes": purposes}
)
return True
def check_consent(self, user_id: str, purpose: str) -> bool:
"""Check if user has consented"""
consent = self.consent_records.get(user_id)
if not consent:
return False
return purpose in consent["purposes"]
def export_user_data(self, user_id: str) -> Dict:
"""Export all user data (right to data portability)"""
self.audit_logger.log_event(
event_type="data_export",
user_id=user_id,
action="export",
resource="user_data",
result="success"
)
# Collect all user data
user_data = {
"user_id": user_id,
"data": self.data_store.get(user_id, {}),
"consent": self.consent_records.get(user_id, {}),
"exported_at": datetime.utcnow().isoformat()
}
return user_data
def delete_user_data(self, user_id: str) -> bool:
"""Delete all user data (right to be forgotten)"""
self.audit_logger.log_event(
event_type="data_deletion",
user_id=user_id,
action="delete",
resource="user_data",
result="success"
)
# Delete all user data
if user_id in self.data_store:
del self.data_store[user_id]
if user_id in self.consent_records:
del self.consent_records[user_id]
return True
def anonymize_data(self, user_id: str) -> bool:
"""Anonymize user data"""
if user_id in self.data_store:
# Replace with anonymized version
self.data_store[f"anon_{hash(user_id)}"] = self.data_store[user_id]
del self.data_store[user_id]
return True
# Usage
gdpr = GDPRCompliance()
# Collect consent
gdpr.collect_consent("user123", ["analytics", "personalization"])
# Check consent
has_consent = gdpr.check_consent("user123", "analytics")
# Export data
user_data = gdpr.export_user_data("user123")
# Delete data
gdpr.delete_user_data("user123")
SOC 2 Compliance
class SOC2Compliance:
"""SOC 2 compliance controls"""
def __init__(self):
self.audit_logger = AuditLogger()
self.access_controls = RBACManager()
def implement_access_controls(self):
"""Implement access controls (Security)"""
# Already implemented via RBAC
pass
def monitor_availability(self) -> Dict:
"""Monitor system availability (Availability)"""
# Check service health
health_status = {
"agent_service": self.check_service_health("agent"),
"tool_service": self.check_service_health("tools"),
"memory_service": self.check_service_health("memory")
}
uptime = sum(1 for status in health_status.values() if status) / len(health_status)
return {
"uptime_percentage": uptime * 100,
"services": health_status
}
def ensure_processing_integrity(self, data: Dict) -> bool:
"""Ensure processing integrity (Processing Integrity)"""
# Validate data
if not self.validate_data(data):
return False
# Log processing
self.audit_logger.log_event(
event_type="data_processing",
user_id=data.get("user_id", "system"),
action="process",
resource="data",
result="success"
)
return True
def protect_confidentiality(self, data: str) -> str:
"""Protect data confidentiality (Confidentiality)"""
encryption = DataEncryption("secret-key")
return encryption.encrypt(data)
def maintain_privacy(self, user_id: str) -> bool:
"""Maintain privacy (Privacy)"""
# Implement privacy controls
gdpr = GDPRCompliance()
# Check consent
has_consent = gdpr.check_consent(user_id, "data_processing")
if not has_consent:
return False
return True
def check_service_health(self, service: str) -> bool:
"""Check service health"""
# In production, actually check service
return True
def validate_data(self, data: Dict) -> bool:
"""Validate data integrity"""
# Implement validation logic
return True
Best Practices
- Authentication: Always authenticate users
- Authorization: Implement least privilege
- Encryption: Encrypt sensitive data
- Audit logging: Log all security events
- Input validation: Validate all inputs
- Rate limiting: Prevent abuse
- Security headers: Use proper HTTP headers
- Regular updates: Keep dependencies updated
- Security testing: Regular penetration testing
- Incident response: Have a plan
Next Steps
You now understand security and compliance! Next, we’ll explore cost optimization strategies for production agent systems.