""" Base configuration class for BackgroundFX Pro. Provides a flexible, environment-aware configuration system with validation, type checking, and automatic loading from multiple sources. """ import os import json import yaml from pathlib import Path from typing import Any, Dict, Optional, List, Union, Type, TypeVar from dataclasses import dataclass, field, asdict from abc import ABC, abstractmethod import logging from dotenv import load_dotenv from pydantic import BaseModel, ValidationError, Field logger = logging.getLogger(__name__) T = TypeVar('T', bound='BaseConfig') class ConfigurationError(Exception): """Raised when configuration is invalid or missing.""" pass class ConfigSource(ABC): """Abstract base class for configuration sources.""" @abstractmethod def load(self) -> Dict[str, Any]: """Load configuration from source.""" pass @abstractmethod def exists(self) -> bool: """Check if configuration source exists.""" pass class EnvFileSource(ConfigSource): """Load configuration from .env file.""" def __init__(self, path: str = ".env"): self.path = Path(path) def exists(self) -> bool: return self.path.exists() def load(self) -> Dict[str, Any]: if not self.exists(): return {} load_dotenv(self.path) return dict(os.environ) class JSONFileSource(ConfigSource): """Load configuration from JSON file.""" def __init__(self, path: str): self.path = Path(path) def exists(self) -> bool: return self.path.exists() def load(self) -> Dict[str, Any]: if not self.exists(): return {} with open(self.path, 'r') as f: return json.load(f) class YAMLFileSource(ConfigSource): """Load configuration from YAML file.""" def __init__(self, path: str): self.path = Path(path) def exists(self) -> bool: return self.path.exists() def load(self) -> Dict[str, Any]: if not self.exists(): return {} with open(self.path, 'r') as f: return yaml.safe_load(f) class EnvironmentSource(ConfigSource): """Load configuration from environment variables.""" def __init__(self, prefix: str = "BACKGROUNDFX_"): self.prefix = prefix def exists(self) -> bool: return True def load(self) -> Dict[str, Any]: config = {} for key, value in os.environ.items(): if key.startswith(self.prefix): # Remove prefix and convert to lowercase clean_key = key[len(self.prefix):].lower() config[clean_key] = value return config @dataclass class BaseConfig: """ Base configuration class with common settings. Provides methods for loading, validating, and merging configurations from multiple sources. """ # Application settings app_name: str = "BackgroundFX Pro" app_version: str = "1.0.0" environment: str = field(default_factory=lambda: os.getenv("ENVIRONMENT", "development")) debug: bool = field(default_factory=lambda: os.getenv("DEBUG", "false").lower() == "true") # Server settings host: str = "0.0.0.0" port: int = 8000 workers: int = 4 reload: bool = False # Paths base_dir: Path = field(default_factory=lambda: Path(__file__).parent.parent) data_dir: Path = field(default_factory=lambda: Path("data")) temp_dir: Path = field(default_factory=lambda: Path("/tmp/backgroundfx")) log_dir: Path = field(default_factory=lambda: Path("logs")) # Logging log_level: str = "INFO" log_format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" log_file: Optional[str] = None # Security secret_key: str = field(default_factory=lambda: os.getenv("SECRET_KEY", "change-me-in-production")) api_key_header: str = "X-API-Key" cors_origins: List[str] = field(default_factory=lambda: ["*"]) # Rate limiting rate_limit_enabled: bool = True rate_limit_requests: int = 100 rate_limit_window: int = 3600 # seconds # Feature flags enable_video_processing: bool = True enable_batch_processing: bool = True enable_ai_backgrounds: bool = True enable_webhooks: bool = True _sources: List[ConfigSource] = field(default_factory=list, init=False, repr=False) _validators: List[callable] = field(default_factory=list, init=False, repr=False) def __post_init__(self): """Initialize configuration after dataclass creation.""" self.setup_directories() self.validate() def setup_directories(self): """Create necessary directories if they don't exist.""" for dir_attr in ['data_dir', 'temp_dir', 'log_dir']: dir_path = getattr(self, dir_attr) if isinstance(dir_path, str): dir_path = Path(dir_path) setattr(self, dir_attr, dir_path) dir_path.mkdir(parents=True, exist_ok=True) def add_source(self, source: ConfigSource): """Add a configuration source.""" self._sources.append(source) def add_validator(self, validator: callable): """Add a configuration validator.""" self._validators.append(validator) def load_from_sources(self): """Load configuration from all registered sources.""" merged_config = {} for source in self._sources: if source.exists(): try: config = source.load() merged_config.update(config) logger.debug(f"Loaded configuration from {source.__class__.__name__}") except Exception as e: logger.warning(f"Failed to load from {source.__class__.__name__}: {e}") # Update instance attributes for key, value in merged_config.items(): if hasattr(self, key): # Type conversion attr_type = type(getattr(self, key)) try: if attr_type == bool: value = str(value).lower() in ('true', '1', 'yes', 'on') elif attr_type == Path: value = Path(value) else: value = attr_type(value) setattr(self, key, value) except (ValueError, TypeError) as e: logger.warning(f"Failed to set {key}: {e}") def validate(self): """Validate configuration.""" errors = [] # Run custom validators for validator in self._validators: try: validator(self) except Exception as e: errors.append(str(e)) # Basic validation if not self.secret_key or self.secret_key == "change-me-in-production": if self.environment == "production": errors.append("SECRET_KEY must be set in production") if self.port < 1 or self.port > 65535: errors.append(f"Invalid port: {self.port}") if self.workers < 1: errors.append(f"Invalid workers count: {self.workers}") if self.rate_limit_requests < 1: errors.append(f"Invalid rate limit: {self.rate_limit_requests}") if errors: raise ConfigurationError("\n".join(errors)) def to_dict(self) -> Dict[str, Any]: """Convert configuration to dictionary.""" return { k: str(v) if isinstance(v, Path) else v for k, v in asdict(self).items() if not k.startswith('_') } def to_json(self, indent: int = 2) -> str: """Convert configuration to JSON string.""" return json.dumps(self.to_dict(), indent=indent, default=str) def to_yaml(self) -> str: """Convert configuration to YAML string.""" return yaml.dump(self.to_dict(), default_flow_style=False) def save(self, path: Union[str, Path], format: str = "json"): """Save configuration to file.""" path = Path(path) if format == "json": content = self.to_json() elif format == "yaml": content = self.to_yaml() elif format == "env": content = self.to_env() else: raise ValueError(f"Unsupported format: {format}") with open(path, 'w') as f: f.write(content) def to_env(self) -> str: """Convert configuration to .env format.""" lines = [] for key, value in self.to_dict().items(): if value is not None: env_key = f"BACKGROUNDFX_{key.upper()}" if isinstance(value, (list, dict)): value = json.dumps(value) lines.append(f"{env_key}={value}") return "\n".join(lines) @classmethod def from_file(cls: Type[T], path: Union[str, Path]) -> T: """Load configuration from file.""" path = Path(path) if not path.exists(): raise ConfigurationError(f"Configuration file not found: {path}") config = cls() if path.suffix == ".json": config.add_source(JSONFileSource(str(path))) elif path.suffix in (".yaml", ".yml"): config.add_source(YAMLFileSource(str(path))) elif path.suffix == ".env": config.add_source(EnvFileSource(str(path))) else: raise ConfigurationError(f"Unsupported file format: {path.suffix}") config.load_from_sources() return config @classmethod def from_env(cls: Type[T], prefix: str = "BACKGROUNDFX_") -> T: """Load configuration from environment variables.""" config = cls() config.add_source(EnvironmentSource(prefix)) config.load_from_sources() return config def merge(self, other: 'BaseConfig'): """Merge another configuration into this one.""" for key, value in other.to_dict().items(): if hasattr(self, key): setattr(self, key, value) def get_section(self, section: str) -> Dict[str, Any]: """Get configuration section as dictionary.""" result = {} prefix = f"{section}_" for key, value in self.to_dict().items(): if key.startswith(prefix): clean_key = key[len(prefix):] result[clean_key] = value return result def __repr__(self) -> str: """String representation of configuration.""" items = [] for key, value in self.to_dict().items(): if 'secret' in key.lower() or 'password' in key.lower() or 'key' in key.lower(): value = "***" items.append(f"{key}={value}") return f"{self.__class__.__name__}({', '.join(items[:5])}...)" class ConfigManager: """ Manages multiple configuration instances and environments. """ def __init__(self): self._configs: Dict[str, BaseConfig] = {} self._active: Optional[str] = None def register(self, name: str, config: BaseConfig): """Register a configuration.""" self._configs[name] = config if self._active is None: self._active = name def get(self, name: Optional[str] = None) -> BaseConfig: """Get configuration by name or active configuration.""" name = name or self._active if name not in self._configs: raise ConfigurationError(f"Configuration not found: {name}") return self._configs[name] def set_active(self, name: str): """Set active configuration.""" if name not in self._configs: raise ConfigurationError(f"Configuration not found: {name}") self._active = name def reload(self, name: Optional[str] = None): """Reload configuration from sources.""" config = self.get(name) config.load_from_sources() config.validate() @property def active(self) -> BaseConfig: """Get active configuration.""" return self.get() # Global configuration manager config_manager = ConfigManager()