|
|
"""Zero-Trust Permission Validation System""" |
|
|
|
|
|
import json |
|
|
import re |
|
|
from pathlib import Path |
|
|
from typing import Dict, Any, List, Optional |
|
|
from datetime import datetime |
|
|
|
|
|
def load_permission_matrix() -> Dict[str, Any]: |
|
|
"""Load permission matrix from JSON""" |
|
|
matrix_path = Path(__file__).parent.parent / "data" / "permission_matrix.json" |
|
|
with open(matrix_path, 'r') as f: |
|
|
return json.load(f) |
|
|
|
|
|
def get_agent_role(agent_id: str) -> Optional[str]: |
|
|
""" |
|
|
Extract role from agent_id |
|
|
Expected format: role-name-01, role-name-02, etc. |
|
|
""" |
|
|
if not agent_id: |
|
|
return "guest-agent" |
|
|
|
|
|
|
|
|
parts = agent_id.rsplit('-', 1) |
|
|
if len(parts) == 2 and parts[1].isdigit(): |
|
|
return parts[0] |
|
|
return agent_id |
|
|
|
|
|
def check_pattern_match(resource: str, patterns: List[str]) -> bool: |
|
|
""" |
|
|
Check if resource matches any of the allowed patterns |
|
|
Supports wildcards like database:*:read or filesystem:/tmp/*:write |
|
|
""" |
|
|
for pattern in patterns: |
|
|
|
|
|
regex_pattern = pattern.replace('*', '.*').replace(':', r'\:') |
|
|
if re.match(f"^{regex_pattern}$", resource): |
|
|
return True |
|
|
return False |
|
|
|
|
|
def check_always_deny(action: str, resource: str) -> tuple[bool, Optional[str]]: |
|
|
"""Check if action is in always_deny list""" |
|
|
matrix = load_permission_matrix() |
|
|
always_deny = matrix['default_policies']['always_deny'] |
|
|
|
|
|
for denied_pattern in always_deny: |
|
|
|
|
|
if '*' in denied_pattern: |
|
|
pattern = denied_pattern.replace('*', '.*') |
|
|
if re.match(f"^{pattern}$", action): |
|
|
return True, f"Action '{action}' is globally denied" |
|
|
elif action == denied_pattern: |
|
|
return True, f"Action '{action}' is globally denied" |
|
|
|
|
|
return False, None |
|
|
|
|
|
def check_requires_approval(action: str, resource: str) -> bool: |
|
|
"""Check if action requires human approval""" |
|
|
matrix = load_permission_matrix() |
|
|
require_approval = matrix['default_policies']['require_approval_for'] |
|
|
|
|
|
for approval_pattern in require_approval: |
|
|
if '*' in approval_pattern: |
|
|
pattern = approval_pattern.replace('*', '.*') |
|
|
if re.match(f"^{pattern}$", action): |
|
|
return True |
|
|
elif action == approval_pattern: |
|
|
return True |
|
|
|
|
|
|
|
|
sensitive_keywords = ['secret', 'credential', 'password', 'token', 'key', 'payment'] |
|
|
resource_lower = resource.lower() |
|
|
if any(keyword in resource_lower for keyword in sensitive_keywords): |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
def validate_permissions( |
|
|
agent_id: str, |
|
|
action: str, |
|
|
resource: str, |
|
|
current_permissions: Optional[List[str]] = None, |
|
|
request_context: Optional[Dict[str, Any]] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Zero-trust permission validation |
|
|
|
|
|
Args: |
|
|
agent_id: Unique identifier for the agent |
|
|
action: The action being attempted (e.g., "read_file", "execute_code") |
|
|
resource: The target resource (e.g., "/etc/passwd", "database:users") |
|
|
current_permissions: Agent's current permission set (optional) |
|
|
request_context: Additional context (IP, session_id, timestamp) |
|
|
|
|
|
Returns: |
|
|
Validation result with decision and recommendations |
|
|
""" |
|
|
matrix = load_permission_matrix() |
|
|
|
|
|
|
|
|
is_denied, deny_reason = check_always_deny(action, resource) |
|
|
if is_denied: |
|
|
from .audit import generate_audit_id |
|
|
audit_id = generate_audit_id("perm") |
|
|
|
|
|
return { |
|
|
"allowed": False, |
|
|
"decision": "DENY", |
|
|
"reason": deny_reason, |
|
|
"agent_role": "unknown", |
|
|
"required_permissions": [], |
|
|
"current_permissions": current_permissions or [], |
|
|
"permission_gap": [], |
|
|
"recommendations": ["This action is prohibited by security policy"], |
|
|
"escalation_path": "Contact security-admin@company.com", |
|
|
"audit_id": audit_id |
|
|
} |
|
|
|
|
|
|
|
|
role = get_agent_role(agent_id) |
|
|
|
|
|
|
|
|
if role not in matrix['roles']: |
|
|
from .audit import generate_audit_id |
|
|
audit_id = generate_audit_id("perm") |
|
|
|
|
|
return { |
|
|
"allowed": False, |
|
|
"decision": "DENY", |
|
|
"reason": f"Unknown agent role: '{role}'", |
|
|
"agent_role": role, |
|
|
"required_permissions": [], |
|
|
"current_permissions": current_permissions or [], |
|
|
"permission_gap": [], |
|
|
"recommendations": ["Register agent with valid role in permission matrix"], |
|
|
"escalation_path": "Contact admin to configure agent permissions", |
|
|
"audit_id": audit_id |
|
|
} |
|
|
|
|
|
role_config = matrix['roles'][role] |
|
|
|
|
|
|
|
|
if action in role_config.get('denied_actions', []): |
|
|
from .audit import generate_audit_id |
|
|
audit_id = generate_audit_id("perm") |
|
|
|
|
|
return { |
|
|
"allowed": False, |
|
|
"decision": "DENY", |
|
|
"reason": f"Agent role '{role}' explicitly denies action '{action}'", |
|
|
"agent_role": role, |
|
|
"required_permissions": [], |
|
|
"current_permissions": current_permissions or [], |
|
|
"permission_gap": [f"{action} on {resource}"], |
|
|
"recommendations": [ |
|
|
"This action is not permitted for your role", |
|
|
"Request role change if elevated access is needed" |
|
|
], |
|
|
"escalation_path": "Contact security-admin@company.com", |
|
|
"audit_id": audit_id |
|
|
} |
|
|
|
|
|
|
|
|
if action not in role_config['allowed_actions']: |
|
|
from .audit import generate_audit_id |
|
|
audit_id = generate_audit_id("perm") |
|
|
|
|
|
return { |
|
|
"allowed": False, |
|
|
"decision": "DENY", |
|
|
"reason": f"Action '{action}' not in allowed actions for role '{role}'", |
|
|
"agent_role": role, |
|
|
"required_permissions": [f"{action}:{resource}"], |
|
|
"current_permissions": role_config['allowed_actions'], |
|
|
"permission_gap": [action], |
|
|
"recommendations": [ |
|
|
"Request permission addition from administrator", |
|
|
"Use alternative action within your current permissions" |
|
|
], |
|
|
"escalation_path": "Submit permission request at /admin/permissions", |
|
|
"audit_id": audit_id |
|
|
} |
|
|
|
|
|
|
|
|
resource_allowed = check_pattern_match(resource, role_config['resource_patterns']) |
|
|
|
|
|
if not resource_allowed: |
|
|
from .audit import generate_audit_id |
|
|
audit_id = generate_audit_id("perm") |
|
|
|
|
|
return { |
|
|
"allowed": False, |
|
|
"decision": "DENY", |
|
|
"reason": f"Resource '{resource}' does not match allowed patterns for role '{role}'", |
|
|
"agent_role": role, |
|
|
"required_permissions": [f"{action}:{resource}"], |
|
|
"current_permissions": role_config['resource_patterns'], |
|
|
"permission_gap": [f"access to {resource}"], |
|
|
"recommendations": [ |
|
|
"Verify resource path is correct", |
|
|
"Request access to this resource pattern" |
|
|
], |
|
|
"escalation_path": "Contact security-admin@company.com", |
|
|
"audit_id": audit_id |
|
|
} |
|
|
|
|
|
|
|
|
requires_approval = check_requires_approval(action, resource) |
|
|
|
|
|
from .audit import generate_audit_id |
|
|
audit_id = generate_audit_id("perm") |
|
|
|
|
|
if requires_approval: |
|
|
return { |
|
|
"allowed": False, |
|
|
"decision": "REQUIRES_APPROVAL", |
|
|
"reason": f"Action '{action}' on '{resource}' requires human approval", |
|
|
"agent_role": role, |
|
|
"required_permissions": [f"{action}:{resource}"], |
|
|
"current_permissions": role_config['allowed_actions'], |
|
|
"permission_gap": ["human approval"], |
|
|
"recommendations": [ |
|
|
"Submit approval request with justification", |
|
|
"Approval required due to sensitive action/resource" |
|
|
], |
|
|
"escalation_path": "Submit at /admin/approval-requests", |
|
|
"audit_id": audit_id, |
|
|
"approval_required": True |
|
|
} |
|
|
|
|
|
|
|
|
return { |
|
|
"allowed": True, |
|
|
"decision": "ALLOW", |
|
|
"reason": f"Agent '{agent_id}' has valid permissions for '{action}' on '{resource}'", |
|
|
"agent_role": role, |
|
|
"required_permissions": [f"{action}:{resource}"], |
|
|
"current_permissions": role_config['allowed_actions'], |
|
|
"permission_gap": [], |
|
|
"recommendations": [], |
|
|
"escalation_path": None, |
|
|
"audit_id": audit_id |
|
|
} |
|
|
|