Skip to content

Auto-Generated API Reference

This page provides auto-generated documentation from the source code docstrings using mkdocstrings.

For hand-written endpoint guides, see the other pages in the API Reference section.


Configuration

file_organizer.api.config

API configuration and settings loader.

Classes

ApiSettings

Bases: BaseModel

Settings for the FastAPI backend.

Functions

load_settings()

Load API settings from a config file and environment variables.

Environment variables override config file values when present.

Source code in src/file_organizer/api/config.py
def load_settings() -> ApiSettings:
    """Load API settings from a config file and environment variables.

    Environment variables override config file values when present.
    """
    config_path = os.environ.get("FO_API_CONFIG_PATH")
    data: dict[str, Any] = {}

    if config_path:
        # Config file path is provided by deployment configuration, not request data.
        path = Path(config_path).expanduser()  # codeql[py/path-injection]
        if path.exists():
            payload = _load_yaml(path)
            data.update(payload.get("api", payload))
        else:
            logger.warning("API config path does not exist: {}", path)

    env = os.environ
    if "FO_API_APP_NAME" in env:
        data["app_name"] = env["FO_API_APP_NAME"]
    if "FO_API_VERSION" in env:
        data["version"] = env["FO_API_VERSION"]
    if "FO_API_ENVIRONMENT" in env:
        data["environment"] = env["FO_API_ENVIRONMENT"]
    if "FO_API_HOST" in env:
        data["host"] = env["FO_API_HOST"]
    if "FO_API_PORT" in env:
        try:
            data["port"] = int(env["FO_API_PORT"])
        except ValueError:
            logger.warning("Invalid FO_API_PORT value: {}", env["FO_API_PORT"])
    if "FO_API_LOG_LEVEL" in env:
        data["log_level"] = env["FO_API_LOG_LEVEL"]
    if "FO_API_CORS_ORIGINS" in env:
        data["cors_origins"] = _parse_list(env["FO_API_CORS_ORIGINS"])
    if "FO_API_CORS_ALLOW_METHODS" in env:
        data["cors_allow_methods"] = _parse_list(env["FO_API_CORS_ALLOW_METHODS"])
    if "FO_API_CORS_ALLOW_HEADERS" in env:
        data["cors_allow_headers"] = _parse_list(env["FO_API_CORS_ALLOW_HEADERS"])
    if "FO_API_CORS_ALLOW_CREDENTIALS" in env:
        data["cors_allow_credentials"] = env["FO_API_CORS_ALLOW_CREDENTIALS"].lower() in (
            "1",
            "true",
            "yes",
        )
    if "FO_API_ENABLE_DOCS" in env:
        data["enable_docs"] = env["FO_API_ENABLE_DOCS"].lower() in ("1", "true", "yes")
    if "FO_API_ALLOWED_PATHS" in env:
        data["allowed_paths"] = _parse_list(env["FO_API_ALLOWED_PATHS"])
    if "FO_API_WS_PING_INTERVAL" in env:
        try:
            interval = int(env["FO_API_WS_PING_INTERVAL"])
            if interval > 0:
                data["websocket_ping_interval"] = interval
            else:
                logger.warning(
                    "Invalid FO_API_WS_PING_INTERVAL value (must be > 0): {}",
                    env["FO_API_WS_PING_INTERVAL"],
                )
        except ValueError:
            logger.warning(
                "Invalid FO_API_WS_PING_INTERVAL value: {}",
                env["FO_API_WS_PING_INTERVAL"],
            )
    if "FO_API_WEBSOCKET_TOKEN" in env:
        data["websocket_token"] = env["FO_API_WEBSOCKET_TOKEN"]
    if "FO_API_AUTH_ENABLED" in env:
        data["auth_enabled"] = env["FO_API_AUTH_ENABLED"].lower() in ("1", "true", "yes")
    if "FO_API_AUTH_DB_PATH" in env:
        data["auth_db_path"] = env["FO_API_AUTH_DB_PATH"]
    if "FO_API_AUTH_JWT_SECRET" in env:
        data["auth_jwt_secret"] = env["FO_API_AUTH_JWT_SECRET"]
    if "FO_API_AUTH_JWT_ALGORITHM" in env:
        data["auth_jwt_algorithm"] = env["FO_API_AUTH_JWT_ALGORITHM"]
    if "FO_API_AUTH_ACCESS_MINUTES" in env:
        try:
            data["auth_access_token_minutes"] = int(env["FO_API_AUTH_ACCESS_MINUTES"])
        except ValueError:
            logger.warning(
                "Invalid FO_API_AUTH_ACCESS_MINUTES value: {}",
                env["FO_API_AUTH_ACCESS_MINUTES"],
            )
    if "FO_API_AUTH_REFRESH_DAYS" in env:
        try:
            data["auth_refresh_token_days"] = int(env["FO_API_AUTH_REFRESH_DAYS"])
        except ValueError:
            logger.warning(
                "Invalid FO_API_AUTH_REFRESH_DAYS value: {}",
                env["FO_API_AUTH_REFRESH_DAYS"],
            )
    if "FO_API_AUTH_REDIS_URL" in env:
        data["auth_redis_url"] = env["FO_API_AUTH_REDIS_URL"]
    elif "FO_REDIS_URL" in env:
        data["auth_redis_url"] = env["FO_REDIS_URL"]

    if "FO_API_AUTH_LOGIN_RATE_LIMIT" in env:
        data["auth_login_rate_limit_enabled"] = env["FO_API_AUTH_LOGIN_RATE_LIMIT"].lower() in (
            "1",
            "true",
            "yes",
        )
    if "FO_API_AUTH_LOGIN_MAX_ATTEMPTS" in env:
        try:
            data["auth_login_max_attempts"] = int(env["FO_API_AUTH_LOGIN_MAX_ATTEMPTS"])
        except ValueError:
            logger.warning(
                "Invalid FO_API_AUTH_LOGIN_MAX_ATTEMPTS value: {}",
                env["FO_API_AUTH_LOGIN_MAX_ATTEMPTS"],
            )
    if "FO_API_AUTH_LOGIN_WINDOW_SECONDS" in env:
        try:
            data["auth_login_window_seconds"] = int(env["FO_API_AUTH_LOGIN_WINDOW_SECONDS"])
        except ValueError:
            logger.warning(
                "Invalid FO_API_AUTH_LOGIN_WINDOW_SECONDS value: {}",
                env["FO_API_AUTH_LOGIN_WINDOW_SECONDS"],
            )
    if "FO_API_AUTH_PASSWORD_MIN_LENGTH" in env:
        try:
            data["auth_password_min_length"] = int(env["FO_API_AUTH_PASSWORD_MIN_LENGTH"])
        except ValueError:
            logger.warning(
                "Invalid FO_API_AUTH_PASSWORD_MIN_LENGTH value: {}",
                env["FO_API_AUTH_PASSWORD_MIN_LENGTH"],
            )
    if "FO_API_AUTH_PASSWORD_REQUIRE_NUMBER" in env:
        data["auth_password_require_number"] = env[
            "FO_API_AUTH_PASSWORD_REQUIRE_NUMBER"
        ].lower() in ("1", "true", "yes")
    if "FO_API_AUTH_PASSWORD_REQUIRE_LETTER" in env:
        data["auth_password_require_letter"] = env[
            "FO_API_AUTH_PASSWORD_REQUIRE_LETTER"
        ].lower() in ("1", "true", "yes")
    if "FO_API_AUTH_PASSWORD_REQUIRE_SPECIAL" in env:
        data["auth_password_require_special"] = env[
            "FO_API_AUTH_PASSWORD_REQUIRE_SPECIAL"
        ].lower() in ("1", "true", "yes")
    if "FO_API_AUTH_PASSWORD_REQUIRE_UPPERCASE" in env:
        data["auth_password_require_uppercase"] = env[
            "FO_API_AUTH_PASSWORD_REQUIRE_UPPERCASE"
        ].lower() in ("1", "true", "yes")
    if "FO_API_AUTH_BOOTSTRAP_ADMIN" in env:
        data["auth_bootstrap_admin"] = env["FO_API_AUTH_BOOTSTRAP_ADMIN"].lower() in (
            "1",
            "true",
            "yes",
        )
    if "FO_API_AUTH_BOOTSTRAP_LOCAL_ONLY" in env:
        data["auth_bootstrap_admin_local_only"] = env[
            "FO_API_AUTH_BOOTSTRAP_LOCAL_ONLY"
        ].lower() in ("1", "true", "yes")
    if "FO_API_DATABASE_URL" in env:
        data["database_url"] = env["FO_API_DATABASE_URL"]
    if "FO_API_DB_POOL_SIZE" in env:
        try:
            data["database_pool_size"] = int(env["FO_API_DB_POOL_SIZE"])
        except ValueError:
            logger.warning("Invalid FO_API_DB_POOL_SIZE value: {}", env["FO_API_DB_POOL_SIZE"])
    if "FO_API_DB_MAX_OVERFLOW" in env:
        try:
            data["database_max_overflow"] = int(env["FO_API_DB_MAX_OVERFLOW"])
        except ValueError:
            logger.warning(
                "Invalid FO_API_DB_MAX_OVERFLOW value: {}",
                env["FO_API_DB_MAX_OVERFLOW"],
            )
    if "FO_API_DB_POOL_PRE_PING" in env:
        data["database_pool_pre_ping"] = env["FO_API_DB_POOL_PRE_PING"].lower() in (
            "1",
            "true",
            "yes",
        )
    if "FO_API_DB_POOL_RECYCLE_SECONDS" in env:
        try:
            data["database_pool_recycle_seconds"] = int(env["FO_API_DB_POOL_RECYCLE_SECONDS"])
        except ValueError:
            logger.warning(
                "Invalid FO_API_DB_POOL_RECYCLE_SECONDS value: {}",
                env["FO_API_DB_POOL_RECYCLE_SECONDS"],
            )
    if "FO_API_DB_ECHO" in env:
        data["database_echo"] = env["FO_API_DB_ECHO"].lower() in ("1", "true", "yes")
    if "FO_API_CACHE_REDIS_URL" in env:
        data["cache_redis_url"] = env["FO_API_CACHE_REDIS_URL"]
    elif "FO_REDIS_URL" in env:
        data["cache_redis_url"] = env["FO_REDIS_URL"]
    if "FO_API_CACHE_TTL_SECONDS" in env:
        try:
            data["cache_default_ttl_seconds"] = int(env["FO_API_CACHE_TTL_SECONDS"])
        except ValueError:
            logger.warning(
                "Invalid FO_API_CACHE_TTL_SECONDS value: {}",
                env["FO_API_CACHE_TTL_SECONDS"],
            )
    if "FO_API_API_KEY_ENABLED" in env:
        data["api_key_enabled"] = env["FO_API_API_KEY_ENABLED"].lower() in ("1", "true", "yes")
    if "FO_API_API_KEY_ADMIN" in env:
        data["api_key_admin"] = env["FO_API_API_KEY_ADMIN"].lower() in ("1", "true", "yes")
    if "FO_API_API_KEY_HEADER" in env:
        data["api_key_header"] = env["FO_API_API_KEY_HEADER"]
    if "FO_API_API_KEYS" in env:
        raw_keys = _parse_list(env["FO_API_API_KEYS"])
        data["api_key_hashes"] = [hash_api_key(key) for key in raw_keys]
    if "FO_API_API_KEY_HASHES" in env:
        data["api_key_hashes"] = _parse_list(env["FO_API_API_KEY_HASHES"])
    if "FO_API_RATE_LIMIT_ENABLED" in env:
        data["rate_limit_enabled"] = env["FO_API_RATE_LIMIT_ENABLED"].lower() in (
            "1",
            "true",
            "yes",
        )
    if "FO_API_RATE_LIMIT_DEFAULT_REQUESTS" in env:
        try:
            data["rate_limit_default_requests"] = int(env["FO_API_RATE_LIMIT_DEFAULT_REQUESTS"])
        except ValueError:
            logger.warning(
                "Invalid FO_API_RATE_LIMIT_DEFAULT_REQUESTS value: {}",
                env["FO_API_RATE_LIMIT_DEFAULT_REQUESTS"],
            )
    if "FO_API_RATE_LIMIT_DEFAULT_WINDOW_SECONDS" in env:
        try:
            data["rate_limit_default_window_seconds"] = int(
                env["FO_API_RATE_LIMIT_DEFAULT_WINDOW_SECONDS"]
            )
        except ValueError:
            logger.warning(
                "Invalid FO_API_RATE_LIMIT_DEFAULT_WINDOW_SECONDS value: {}",
                env["FO_API_RATE_LIMIT_DEFAULT_WINDOW_SECONDS"],
            )
    if "FO_API_RATE_LIMIT_TRUST_PROXY_HEADERS" in env:
        data["rate_limit_trust_proxy_headers"] = env[
            "FO_API_RATE_LIMIT_TRUST_PROXY_HEADERS"
        ].lower() in ("1", "true", "yes")
    if "FO_API_RATE_LIMIT_EXEMPT_PATHS" in env:
        data["rate_limit_exempt_paths"] = _parse_list(env["FO_API_RATE_LIMIT_EXEMPT_PATHS"])
    if "FO_API_RATE_LIMIT_RULES" in env:
        try:
            parsed_rules = json.loads(env["FO_API_RATE_LIMIT_RULES"])
            if isinstance(parsed_rules, dict):
                data["rate_limit_rules"] = parsed_rules
        except json.JSONDecodeError:
            logger.warning("Invalid FO_API_RATE_LIMIT_RULES JSON value")
    if "FO_API_SECURITY_HEADERS_ENABLED" in env:
        data["security_headers_enabled"] = env["FO_API_SECURITY_HEADERS_ENABLED"].lower() in (
            "1",
            "true",
            "yes",
        )
    if "FO_API_SECURITY_CSP" in env:
        data["security_csp"] = env["FO_API_SECURITY_CSP"]
    if "FO_API_SECURITY_HSTS_SECONDS" in env:
        try:
            data["security_hsts_seconds"] = int(env["FO_API_SECURITY_HSTS_SECONDS"])
        except ValueError:
            logger.warning(
                "Invalid FO_API_SECURITY_HSTS_SECONDS value: {}",
                env["FO_API_SECURITY_HSTS_SECONDS"],
            )
    if "FO_API_SECURITY_HSTS_SUBDOMAINS" in env:
        data["security_hsts_subdomains"] = env["FO_API_SECURITY_HSTS_SUBDOMAINS"].lower() in (
            "1",
            "true",
            "yes",
        )
    if "FO_API_SECURITY_REFERRER_POLICY" in env:
        data["security_referrer_policy"] = env["FO_API_SECURITY_REFERRER_POLICY"]
    if "FO_OLLAMA_URL" in env:
        data["ollama_url"] = env["FO_OLLAMA_URL"]
    elif "OLLAMA_HOST" in env:
        data["ollama_url"] = env["OLLAMA_HOST"]

    api_key_enabled_explicit = "api_key_enabled" in data
    settings = ApiSettings(**data)
    if settings.auth_enabled and settings.auth_jwt_secret.get_secret_value() == "change-me":
        if settings.environment.lower() in {"development", "test"}:
            logger.warning(
                "FO_API_AUTH_JWT_SECRET is using the default placeholder. "
                "Set FO_API_AUTH_JWT_SECRET before deploying."
            )
        else:
            raise ValueError(
                "FO_API_AUTH_JWT_SECRET must be set when auth is enabled outside development."
            )
    if settings.api_key_enabled and not settings.api_key_hashes and api_key_enabled_explicit:
        logger.warning("API key auth is enabled but no keys are configured.")
    if settings.environment.lower() not in {"development", "test"}:
        if "*" in settings.cors_origins:
            raise ValueError("CORS origins must be explicit in production.")
        if any("localhost" in origin or "127.0.0.1" in origin for origin in settings.cors_origins):
            raise ValueError("Localhost CORS origins must be removed in production.")
    return settings

Authentication

file_organizer.api.auth

Authentication helpers for JWT and password hashing.

Classes

TokenError

Bases: Exception

Raised when a JWT token is invalid.

TokenBundle(access_token, refresh_token, access_jti, refresh_jti, access_expires_at, refresh_expires_at) dataclass

Bundle of access and refresh tokens.

Functions

verify_password(plain_password, hashed_password)

Return True if plain_password matches hashed_password.

Source code in src/file_organizer/api/auth.py
def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Return True if plain_password matches hashed_password."""
    return bool(_PWD_CONTEXT.verify(plain_password, hashed_password))

hash_password(password)

Return the bcrypt hash of password.

Source code in src/file_organizer/api/auth.py
def hash_password(password: str) -> str:
    """Return the bcrypt hash of password."""
    return str(_PWD_CONTEXT.hash(password))

validate_password(password, settings)

Validate password strength based on API settings.

Source code in src/file_organizer/api/auth.py
def validate_password(password: str, settings: ApiSettings) -> tuple[bool, str]:
    """Validate password strength based on API settings."""
    if len(password) < settings.auth_password_min_length:
        return (
            False,
            f"Password must be at least {settings.auth_password_min_length} characters long",
        )
    if settings.auth_password_require_letter and not any(ch.isalpha() for ch in password):
        return False, "Password must include at least one letter"
    if settings.auth_password_require_number and not any(ch.isdigit() for ch in password):
        return False, "Password must include at least one number"
    if settings.auth_password_require_uppercase and not any(ch.isupper() for ch in password):
        return False, "Password must include at least one uppercase letter"
    if settings.auth_password_require_special and not any(
        ch in "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~" for ch in password
    ):
        return False, "Password must include at least one special character"
    if password.lower() in _COMMON_PASSWORDS:
        return False, "Password is too common, please choose a more unique password"
    return True, ""

create_token_bundle(user_id, username, settings)

Create a new access and refresh token bundle for a user.

Source code in src/file_organizer/api/auth.py
def create_token_bundle(user_id: str, username: str, settings: ApiSettings) -> TokenBundle:
    """Create a new access and refresh token bundle for a user."""
    payload = {"sub": username, "user_id": user_id}
    access_token, access_jti, access_exp = _build_token(
        payload,
        _ACCESS_TOKEN_TYPE,
        timedelta(minutes=settings.auth_access_token_minutes),
        settings,
    )
    refresh_token, refresh_jti, refresh_exp = _build_token(
        payload,
        _REFRESH_TOKEN_TYPE,
        timedelta(days=settings.auth_refresh_token_days),
        settings,
    )
    return TokenBundle(
        access_token=access_token,
        refresh_token=refresh_token,
        access_jti=access_jti,
        refresh_jti=refresh_jti,
        access_expires_at=access_exp,
        refresh_expires_at=refresh_exp,
    )

decode_token(token, settings)

Decode and return the JWT payload, raising TokenError if invalid.

Source code in src/file_organizer/api/auth.py
def decode_token(token: str, settings: ApiSettings) -> dict[str, Any]:
    """Decode and return the JWT payload, raising TokenError if invalid."""
    try:
        result: dict[str, Any] = jwt.decode(
            token,
            settings.auth_jwt_secret.get_secret_value(),
            algorithms=[settings.auth_jwt_algorithm],
        )
        return result
    except JWTError as exc:
        raise TokenError("Invalid token") from exc

is_access_token(payload)

Return True if payload represents an access token.

Source code in src/file_organizer/api/auth.py
def is_access_token(payload: dict[str, Any]) -> bool:
    """Return True if payload represents an access token."""
    return payload.get("type") == _ACCESS_TOKEN_TYPE

is_refresh_token(payload)

Return True if payload represents a refresh token.

Source code in src/file_organizer/api/auth.py
def is_refresh_token(payload: dict[str, Any]) -> bool:
    """Return True if payload represents a refresh token."""
    return payload.get("type") == _REFRESH_TOKEN_TYPE

API Key Management

file_organizer.api.api_keys

API key helpers for external integrations.

Functions

generate_api_key(prefix='fo')

Generate a new API key.

Source code in src/file_organizer/api/api_keys.py
def generate_api_key(prefix: str = "fo") -> str:
    """Generate a new API key."""
    key_id = secrets.token_hex(4)
    token = secrets.token_urlsafe(32)
    return f"{prefix}_{key_id}_{token}"

hash_api_key(api_key)

Hash an API key for storage.

Source code in src/file_organizer/api/api_keys.py
def hash_api_key(api_key: str) -> str:
    """Hash an API key for storage."""
    return cast(str, _API_KEY_CONTEXT.hash(api_key))

match_api_key_hash(api_key, hashes)

Return the stored hash matching an API key, if any.

Source code in src/file_organizer/api/api_keys.py
def match_api_key_hash(api_key: str, hashes: Iterable[str]) -> str | None:
    """Return the stored hash matching an API key, if any."""
    for stored in hashes:
        try:
            if _API_KEY_CONTEXT.verify(api_key, stored):
                return stored
        except (ValueError, TypeError):
            continue
    return None

verify_api_key(api_key, hashes)

Verify an API key against stored hashes.

Source code in src/file_organizer/api/api_keys.py
def verify_api_key(api_key: str, hashes: Iterable[str]) -> bool:
    """Verify an API key against stored hashes."""
    return match_api_key_hash(api_key, hashes) is not None

api_key_identifier(api_key, hashes)

Return a stable identifier derived from the stored hash.

Source code in src/file_organizer/api/api_keys.py
def api_key_identifier(api_key: str, hashes: Iterable[str]) -> str | None:
    """Return a stable identifier derived from the stored hash."""
    matched = match_api_key_hash(api_key, hashes)
    if not matched:
        return None
    parts = api_key.split("_", 2)
    if len(parts) == 3 and parts[1]:
        return parts[1]
    return matched[-12:]

Rate Limiting

file_organizer.api.rate_limit

Rate limiting helpers for API requests.

Classes

RateLimitResult(allowed, remaining, reset_at) dataclass

Result of a rate limit check.

RateLimiter

Bases: Protocol

Protocol for rate limit backends.

Functions
check(key, limit, window_seconds)

Check rate limit for a key and return the remaining quota.

Source code in src/file_organizer/api/rate_limit.py
def check(self, key: str, limit: int, window_seconds: int) -> RateLimitResult:
    """Check rate limit for a key and return the remaining quota."""

RateLimitState(count, reset_at) dataclass

Mutable rate limit state for a single key.

InMemoryRateLimiter(max_entries=10000, sweep_interval_seconds=60)

Simple in-memory fixed-window rate limiter.

Initialize InMemoryRateLimiter with given capacity and sweep interval.

Source code in src/file_organizer/api/rate_limit.py
def __init__(
    self,
    max_entries: int = 10000,
    sweep_interval_seconds: int = 60,
) -> None:
    """Initialize InMemoryRateLimiter with given capacity and sweep interval."""
    self._state: dict[str, RateLimitState] = {}
    self._last_sweep: int = 0
    self._max_entries = max_entries
    self._sweep_interval_seconds = sweep_interval_seconds
Functions
check(key, limit, window_seconds)

Check rate limit for key and return the result.

Source code in src/file_organizer/api/rate_limit.py
def check(self, key: str, limit: int, window_seconds: int) -> RateLimitResult:
    """Check rate limit for key and return the result."""
    now = int(time.time())
    if now - self._last_sweep >= self._sweep_interval_seconds:
        self._sweep(now)
    elif len(self._state) >= self._max_entries:
        self._sweep(now)
    state = self._state.get(key)
    if state is None or state.reset_at <= now:
        reset_at = now + window_seconds
        self._state[key] = RateLimitState(count=1, reset_at=reset_at)
        remaining = max(limit - 1, 0)
        return RateLimitResult(allowed=True, remaining=remaining, reset_at=reset_at)

    state.count += 1
    allowed = state.count <= limit
    remaining = max(limit - state.count, 0)
    return RateLimitResult(allowed=allowed, remaining=remaining, reset_at=state.reset_at)

RedisRateLimiter(redis, prefix='ratelimit:')

Redis-backed fixed-window rate limiter.

Initialize RedisRateLimiter with a Redis client and key prefix.

Source code in src/file_organizer/api/rate_limit.py
def __init__(self, redis: Redis, prefix: str = "ratelimit:") -> None:
    """Initialize RedisRateLimiter with a Redis client and key prefix."""
    self._redis = redis
    self._prefix = prefix
Functions
check(key, limit, window_seconds)

Check rate limit for key against Redis and return the result.

Source code in src/file_organizer/api/rate_limit.py
def check(self, key: str, limit: int, window_seconds: int) -> RateLimitResult:
    """Check rate limit for key against Redis and return the result."""
    now = int(time.time())
    redis_key = self._key(key)
    script = """
    local current = redis.call("INCR", KEYS[1])
    if current == 1 then
      redis.call("EXPIRE", KEYS[1], ARGV[1])
    end
    local ttl = redis.call("TTL", KEYS[1])
    return {current, ttl}
    """
    count, ttl = self._redis.eval(script, 1, redis_key, window_seconds)
    if ttl is None or int(ttl) < 0:
        ttl = window_seconds
    reset_at = now + int(ttl)
    allowed = int(count) <= limit
    remaining = max(limit - int(count), 0)
    return RateLimitResult(allowed=allowed, remaining=remaining, reset_at=reset_at)

Functions

build_rate_limiter(redis_url)

Create a rate limiter instance.

Source code in src/file_organizer/api/rate_limit.py
def build_rate_limiter(redis_url: Optional[str]) -> RateLimiter:
    """Create a rate limiter instance."""
    if not redis_url:
        return InMemoryRateLimiter()
    try:
        client = Redis.from_url(redis_url, decode_responses=True)
        client.ping()
        return RedisRateLimiter(client)
    except Exception as exc:
        logger.warning("Rate limiter Redis unavailable, using in-memory limiter: {}", exc)
        return InMemoryRateLimiter()

Authentication Rate Limiting

file_organizer.api.auth_rate_limit

Login rate limiting helpers.

Classes

LoginRateLimiter

Bases: Protocol

Protocol for login rate limiting backends.

Functions
is_blocked(key)

Return (blocked, retry_after_seconds).

Source code in src/file_organizer/api/auth_rate_limit.py
def is_blocked(self, key: str) -> tuple[bool, int]:
    """Return (blocked, retry_after_seconds)."""
record_failure(key)

Record a failed attempt and return (blocked, retry_after_seconds).

Source code in src/file_organizer/api/auth_rate_limit.py
def record_failure(self, key: str) -> tuple[bool, int]:
    """Record a failed attempt and return (blocked, retry_after_seconds)."""
reset(key)

Clear rate limit state for a key.

Source code in src/file_organizer/api/auth_rate_limit.py
def reset(self, key: str) -> None:
    """Clear rate limit state for a key."""

RateLimitState(count, expires_at) dataclass

Track rate limit count and expiry for a key.

Functions
remaining(now)

Return remaining seconds until window expiry.

Source code in src/file_organizer/api/auth_rate_limit.py
def remaining(self, now: float) -> int:
    """Return remaining seconds until window expiry."""
    return max(0, int(self.expires_at - now))

InMemoryLoginRateLimiter(max_attempts, window_seconds, _state=dict()) dataclass

In-memory fixed-window rate limiter for login attempts.

Functions
is_blocked(key)

Return whether the key is currently blocked and retry-after seconds.

Source code in src/file_organizer/api/auth_rate_limit.py
def is_blocked(self, key: str) -> tuple[bool, int]:
    """Return whether the key is currently blocked and retry-after seconds."""
    now = time.time()
    state = self._get_state(key, now)
    if state is None:
        return False, 0
    if state.count >= self.max_attempts:
        return True, state.remaining(now)
    return False, 0
record_failure(key)

Record a failed attempt and return blocked status and retry-after seconds.

Source code in src/file_organizer/api/auth_rate_limit.py
def record_failure(self, key: str) -> tuple[bool, int]:
    """Record a failed attempt and return blocked status and retry-after seconds."""
    now = time.time()
    state = self._get_state(key, now)
    if state is None:
        state = RateLimitState(count=1, expires_at=now + self.window_seconds)
        self._state[key] = state
    else:
        state.count += 1
    blocked = state.count >= self.max_attempts
    return blocked, state.remaining(now)
reset(key)

Clear rate limit state for the given key.

Source code in src/file_organizer/api/auth_rate_limit.py
def reset(self, key: str) -> None:
    """Clear rate limit state for the given key."""
    self._state.pop(key, None)

RedisLoginRateLimiter(redis, max_attempts, window_seconds, prefix='auth:login:') dataclass

Redis-backed fixed-window login rate limiter.

Functions
is_blocked(key)

Return whether the key is currently blocked and retry-after seconds.

Source code in src/file_organizer/api/auth_rate_limit.py
def is_blocked(self, key: str) -> tuple[bool, int]:
    """Return whether the key is currently blocked and retry-after seconds."""
    redis_key = self._key(key)
    value = self.redis.get(redis_key)
    if value is None:
        return False, 0
    try:
        count = int(value)
    except ValueError:
        self.redis.delete(redis_key)
        return False, 0
    if count >= self.max_attempts:
        return True, self._ttl(redis_key)
    return False, 0
record_failure(key)

Record a failed attempt and return blocked status and retry-after seconds.

Source code in src/file_organizer/api/auth_rate_limit.py
def record_failure(self, key: str) -> tuple[bool, int]:
    """Record a failed attempt and return blocked status and retry-after seconds."""
    redis_key = self._key(key)
    pipe = self.redis.pipeline()
    pipe.incr(redis_key)
    pipe.ttl(redis_key)
    count, ttl = pipe.execute()
    if ttl is None or int(ttl) < 0:
        self.redis.expire(redis_key, self.window_seconds)
        ttl = self.window_seconds
    blocked = int(count) >= self.max_attempts
    return blocked, int(ttl)
reset(key)

Clear rate limit state for the given key.

Source code in src/file_organizer/api/auth_rate_limit.py
def reset(self, key: str) -> None:
    """Clear rate limit state for the given key."""
    self.redis.delete(self._key(key))

Functions

build_login_rate_limiter(redis_url, max_attempts, window_seconds)

Create a login rate limiter, preferring Redis when configured.

Source code in src/file_organizer/api/auth_rate_limit.py
def build_login_rate_limiter(
    redis_url: Optional[str],
    max_attempts: int,
    window_seconds: int,
) -> LoginRateLimiter:
    """Create a login rate limiter, preferring Redis when configured."""
    if not redis_url:
        return InMemoryLoginRateLimiter(max_attempts=max_attempts, window_seconds=window_seconds)
    try:
        client = Redis.from_url(redis_url, decode_responses=True)
        client.ping()
        return RedisLoginRateLimiter(
            redis=client,
            max_attempts=max_attempts,
            window_seconds=window_seconds,
        )
    except Exception as exc:
        logger.warning("Auth redis unavailable, using in-memory rate limiter: {}", exc)
        return InMemoryLoginRateLimiter(max_attempts=max_attempts, window_seconds=window_seconds)

Token Store

file_organizer.api.auth_store

Token storage for authentication sessions.

Classes

TokenStore

Bases: Protocol

Protocol for token storage backends.

Functions
store_refresh(jti, user_id, ttl_seconds)

Store a refresh token identifier with TTL.

Source code in src/file_organizer/api/auth_store.py
def store_refresh(self, jti: str, user_id: str, ttl_seconds: int) -> None:
    """Store a refresh token identifier with TTL."""
    ...
is_refresh_active(jti)

Return True if the refresh token is active.

Source code in src/file_organizer/api/auth_store.py
def is_refresh_active(self, jti: str) -> bool:
    """Return True if the refresh token is active."""
    ...
revoke_refresh(jti)

Revoke a refresh token identifier.

Source code in src/file_organizer/api/auth_store.py
def revoke_refresh(self, jti: str) -> None:
    """Revoke a refresh token identifier."""
    ...
revoke_access(jti, ttl_seconds)

Mark an access token as revoked for the remaining TTL.

Source code in src/file_organizer/api/auth_store.py
def revoke_access(self, jti: str, ttl_seconds: int) -> None:
    """Mark an access token as revoked for the remaining TTL."""
    ...
is_access_revoked(jti)

Return True if the access token has been revoked.

Source code in src/file_organizer/api/auth_store.py
def is_access_revoked(self, jti: str) -> bool:
    """Return True if the access token has been revoked."""
    ...

InMemoryTokenStore()

Simple in-memory token store for testing or local fallback.

Initialize InMemoryTokenStore with empty refresh and revoked buckets.

Source code in src/file_organizer/api/auth_store.py
def __init__(self) -> None:
    """Initialize InMemoryTokenStore with empty refresh and revoked buckets."""
    self._refresh: dict[str, float] = {}
    self._revoked: dict[str, float] = {}
Functions
store_refresh(jti, user_id, ttl_seconds)

Store a refresh token with the given TTL.

Source code in src/file_organizer/api/auth_store.py
def store_refresh(self, jti: str, user_id: str, ttl_seconds: int) -> None:
    """Store a refresh token with the given TTL."""
    self._refresh[jti] = time.time() + ttl_seconds
is_refresh_active(jti)

Return True if the refresh token is active.

Source code in src/file_organizer/api/auth_store.py
def is_refresh_active(self, jti: str) -> bool:
    """Return True if the refresh token is active."""
    return self._is_active(self._refresh, jti)
revoke_refresh(jti)

Revoke a refresh token by JTI.

Source code in src/file_organizer/api/auth_store.py
def revoke_refresh(self, jti: str) -> None:
    """Revoke a refresh token by JTI."""
    self._refresh.pop(jti, None)
revoke_access(jti, ttl_seconds)

Mark an access token as revoked for the remaining TTL.

Source code in src/file_organizer/api/auth_store.py
def revoke_access(self, jti: str, ttl_seconds: int) -> None:
    """Mark an access token as revoked for the remaining TTL."""
    self._revoked[jti] = time.time() + ttl_seconds
is_access_revoked(jti)

Return True if the access token has been revoked.

Source code in src/file_organizer/api/auth_store.py
def is_access_revoked(self, jti: str) -> bool:
    """Return True if the access token has been revoked."""
    return self._is_active(self._revoked, jti)

RedisTokenStore(redis, refresh_prefix='auth:refresh:', revoked_prefix='auth:revoked:') dataclass

Redis-backed token store for production use.

Functions
store_refresh(jti, user_id, ttl_seconds)

Store a refresh token with the given TTL in Redis.

Source code in src/file_organizer/api/auth_store.py
def store_refresh(self, jti: str, user_id: str, ttl_seconds: int) -> None:
    """Store a refresh token with the given TTL in Redis."""
    self.redis.setex(self._refresh_key(jti), ttl_seconds, user_id)
is_refresh_active(jti)

Return True if the refresh token is active in Redis.

Source code in src/file_organizer/api/auth_store.py
def is_refresh_active(self, jti: str) -> bool:
    """Return True if the refresh token is active in Redis."""
    return self.redis.exists(self._refresh_key(jti)) == 1
revoke_refresh(jti)

Revoke a refresh token in Redis.

Source code in src/file_organizer/api/auth_store.py
def revoke_refresh(self, jti: str) -> None:
    """Revoke a refresh token in Redis."""
    self.redis.delete(self._refresh_key(jti))
revoke_access(jti, ttl_seconds)

Mark an access token as revoked in Redis.

Source code in src/file_organizer/api/auth_store.py
def revoke_access(self, jti: str, ttl_seconds: int) -> None:
    """Mark an access token as revoked in Redis."""
    self.redis.setex(self._revoked_key(jti), ttl_seconds, "1")
is_access_revoked(jti)

Return True if the access token has been revoked in Redis.

Source code in src/file_organizer/api/auth_store.py
def is_access_revoked(self, jti: str) -> bool:
    """Return True if the access token has been revoked in Redis."""
    return self.redis.exists(self._revoked_key(jti)) == 1

Functions

build_token_store(redis_url)

Create a token store, preferring Redis when configured.

Source code in src/file_organizer/api/auth_store.py
def build_token_store(redis_url: Optional[str]) -> TokenStore:
    """Create a token store, preferring Redis when configured."""
    if not redis_url:
        return InMemoryTokenStore()
    try:
        client = Redis.from_url(redis_url, decode_responses=True)
        client.ping()
        return RedisTokenStore(client)
    except Exception as exc:
        logger.warning("Auth redis unavailable, using in-memory token store: {}", exc)
        return InMemoryTokenStore()

Caching

file_organizer.api.cache

Cache abstraction for API persistence layers.

Provides a small key/value interface with an in-memory implementation and an optional Redis backend.

Classes

CacheBackend

Bases: Protocol

Minimal cache backend contract.

Functions
get(key)

Return cached value for key, or None when absent/expired.

Source code in src/file_organizer/api/cache.py
def get(self, key: str) -> Optional[str]:
    """Return cached value for *key*, or None when absent/expired."""
set(key, value, *, ttl_seconds)

Store value for key with TTL.

Source code in src/file_organizer/api/cache.py
def set(self, key: str, value: str, *, ttl_seconds: int) -> None:
    """Store *value* for *key* with TTL."""
delete(key)

Delete key if present.

Source code in src/file_organizer/api/cache.py
def delete(self, key: str) -> None:
    """Delete *key* if present."""
close()

Release any backend resources.

Source code in src/file_organizer/api/cache.py
def close(self) -> None:
    """Release any backend resources."""

InMemoryCache()

In-process TTL cache implementation.

Thread-safe: all access to _entries is protected by a lock.

Initialize InMemoryCache with empty entries dict.

Source code in src/file_organizer/api/cache.py
def __init__(self) -> None:
    """Initialize InMemoryCache with empty entries dict."""
    self._entries: dict[str, _MemoryEntry] = {}
    self._lock = threading.Lock()
Functions
get(key)

Return the cached value for key, or None if absent or expired.

Source code in src/file_organizer/api/cache.py
def get(self, key: str) -> Optional[str]:
    """Return the cached value for key, or None if absent or expired."""
    with self._lock:
        entry = self._entries.get(key)
        if entry is None:
            return None
        if entry.expires_at < time.time():
            self._entries.pop(key, None)
            return None
        return entry.value
set(key, value, *, ttl_seconds)

Store value for key with the given TTL.

Source code in src/file_organizer/api/cache.py
def set(self, key: str, value: str, *, ttl_seconds: int) -> None:
    """Store value for key with the given TTL."""
    expires_at = time.time() + max(1, ttl_seconds)
    with self._lock:
        self._entries[key] = _MemoryEntry(value=value, expires_at=expires_at)
delete(key)

Delete key from the cache if present.

Source code in src/file_organizer/api/cache.py
def delete(self, key: str) -> None:
    """Delete key from the cache if present."""
    with self._lock:
        self._entries.pop(key, None)
close()

Clear all cache entries.

Source code in src/file_organizer/api/cache.py
def close(self) -> None:
    """Clear all cache entries."""
    with self._lock:
        self._entries.clear()

RedisCache(redis_url)

Redis-backed cache implementation.

Initialize RedisCache with a connection to redis_url.

Source code in src/file_organizer/api/cache.py
def __init__(self, redis_url: str) -> None:
    """Initialize RedisCache with a connection to redis_url."""
    if Redis is None:
        raise RuntimeError("redis package not installed")
    self._redis = Redis.from_url(redis_url, decode_responses=True)
Functions
get(key)

Return the cached value for key, or None on miss or error.

Source code in src/file_organizer/api/cache.py
def get(self, key: str) -> Optional[str]:
    """Return the cached value for key, or None on miss or error."""
    try:
        value = self._redis.get(key)
    except RedisError as exc:
        logger.warning("Redis cache get failed for {}: {}", key, exc)
        return None
    return value if isinstance(value, str) else None
set(key, value, *, ttl_seconds)

Store value for key with the given TTL in Redis.

Source code in src/file_organizer/api/cache.py
def set(self, key: str, value: str, *, ttl_seconds: int) -> None:
    """Store value for key with the given TTL in Redis."""
    try:
        self._redis.setex(key, max(1, ttl_seconds), value)
    except RedisError as exc:
        logger.warning("Redis cache set failed for {}: {}", key, exc)
delete(key)

Delete key from Redis if present.

Source code in src/file_organizer/api/cache.py
def delete(self, key: str) -> None:
    """Delete key from Redis if present."""
    try:
        self._redis.delete(key)
    except RedisError as exc:
        logger.warning("Redis cache delete failed for {}: {}", key, exc)
close()

Close the Redis connection.

Source code in src/file_organizer/api/cache.py
def close(self) -> None:
    """Close the Redis connection."""
    try:
        self._redis.close()
    except RedisError as exc:
        logger.warning("Redis cache close failed: {}", exc)

Functions

build_cache_backend(redis_url)

Build a cache backend from configuration.

Falls back to in-memory cache when Redis is unavailable or connection validation fails.

Source code in src/file_organizer/api/cache.py
def build_cache_backend(redis_url: Optional[str]) -> CacheBackend:
    """Build a cache backend from configuration.

    Falls back to in-memory cache when Redis is unavailable or connection
    validation fails.
    """
    if not redis_url:
        return InMemoryCache()
    if not _is_valid_redis_url(redis_url):
        logger.warning("Invalid Redis URL scheme; falling back to in-memory cache")
        return InMemoryCache()

    try:
        backend = RedisCache(redis_url)
        backend.set("__fo_cache_health__", json.dumps({"ok": True}), ttl_seconds=5)
        return backend
    except (RedisError, RuntimeError, ValueError, OSError) as exc:
        logger.warning(
            "Falling back to in-memory cache (redis unavailable: {}): {}",
            type(exc).__name__,
            exc,
        )
        return InMemoryCache()

Middleware

file_organizer.api.middleware

Middleware setup for the API layer.

Classes

RateLimitMiddleware(app, settings, limiter)

Bases: BaseHTTPMiddleware

Apply rate limiting based on endpoint and client identity.

Initialize RateLimitMiddleware with app, settings, and limiter.

Source code in src/file_organizer/api/middleware.py
def __init__(self, app: FastAPI, settings: ApiSettings, limiter: RateLimiter) -> None:
    """Initialize RateLimitMiddleware with app, settings, and limiter."""
    super().__init__(app)
    self._settings = settings
    self._limiter = limiter
    self._rule_prefixes = sorted(settings.rate_limit_rules.keys(), key=len, reverse=True)
Functions
dispatch(request, call_next) async

Process an HTTP request through the rate limiter.

Source code in src/file_organizer/api/middleware.py
async def dispatch(
    self,
    request: Request,
    call_next: Callable[[Request], Awaitable[Response]],
) -> Response:
    """Process an HTTP request through the rate limiter."""
    if not self._settings.rate_limit_enabled or request.scope.get("type") != "http":
        return await call_next(request)

    path = request.url.path
    if self._is_exempt(path):
        return await call_next(request)

    rule = self._rule_for_path(path)
    limit = self._settings.rate_limit_default_requests
    window = self._settings.rate_limit_default_window_seconds
    if rule:
        limit = rule.get("requests", limit)
        window = rule.get("window_seconds", window)

    key = f"{self._client_id(request)}:{path}"
    result = self._limiter.check(key, limit, window)
    if not result.allowed:
        retry_after = max(result.reset_at - int(time.time()), 0)
        response: Response = JSONResponse(
            status_code=429,
            content={"detail": "Rate limit exceeded. Try again later."},
        )
        response.headers["Retry-After"] = str(retry_after)
        self._apply_headers(response, result, limit)
        return response

    response = await call_next(request)
    self._apply_headers(response, result, limit)
    return response

SecurityHeadersMiddleware(app, settings)

Bases: BaseHTTPMiddleware

Attach security headers to API responses.

Initialize SecurityHeadersMiddleware with app and settings.

Source code in src/file_organizer/api/middleware.py
def __init__(self, app: FastAPI, settings: ApiSettings) -> None:
    """Initialize SecurityHeadersMiddleware with app and settings."""
    super().__init__(app)
    self._settings = settings
Functions
dispatch(request, call_next) async

Process an HTTP request and attach security headers to the response.

Source code in src/file_organizer/api/middleware.py
async def dispatch(
    self,
    request: Request,
    call_next: Callable[[Request], Awaitable[Response]],
) -> Response:
    """Process an HTTP request and attach security headers to the response."""
    response = await call_next(request)
    if not self._settings.security_headers_enabled:
        return response

    response.headers.setdefault("X-Frame-Options", "DENY")
    response.headers.setdefault("X-Content-Type-Options", "nosniff")
    response.headers.setdefault("X-XSS-Protection", "1; mode=block")
    response.headers.setdefault("Referrer-Policy", self._settings.security_referrer_policy)
    response.headers.setdefault(
        "Permissions-Policy",
        "geolocation=(), microphone=(), camera=(), payment=()",
    )
    response.headers.setdefault("Content-Security-Policy", self._settings.security_csp)

    if request.url.scheme == "https" and self._settings.security_hsts_seconds > 0:
        hsts = f"max-age={self._settings.security_hsts_seconds}"
        if self._settings.security_hsts_subdomains:
            hsts += "; includeSubDomains"
        response.headers.setdefault("Strict-Transport-Security", hsts)

    return response

Functions

setup_middleware(app, settings)

Configure middleware on the FastAPI app.

Source code in src/file_organizer/api/middleware.py
def setup_middleware(app: FastAPI, settings: ApiSettings) -> None:
    """Configure middleware on the FastAPI app."""
    app.add_middleware(
        CORSMiddleware,
        allow_origins=settings.cors_origins,
        allow_credentials=settings.cors_allow_credentials,
        allow_methods=settings.cors_allow_methods,
        allow_headers=settings.cors_allow_headers,
    )
    app.add_middleware(
        RateLimitMiddleware,
        settings=settings,
        limiter=build_rate_limiter(settings.auth_redis_url),
    )
    app.add_middleware(SecurityHeadersMiddleware, settings=settings)

Dependencies

file_organizer.api.dependencies

Dependency providers for the API layer.

Classes

AnonymousUser(id='anonymous', username='anonymous', email='anonymous@example.com', full_name=None, is_active=True, is_admin=True, created_at=(lambda: datetime.now(UTC))(), last_login=None) dataclass

Anonymous user identity used when auth is disabled.

ApiKeyIdentity(id, username, email='api-key@example.com', full_name=None, is_active=True, is_admin=False, created_at=(lambda: datetime.now(UTC))(), last_login=None, auth_type='api_key') dataclass

API key-based user identity.

Functions

get_settings() cached

Return API settings for request handlers.

Source code in src/file_organizer/api/dependencies.py
@lru_cache
def get_settings() -> ApiSettings:
    """Return API settings for request handlers."""
    return load_settings()

get_config_manager() cached

Return a config manager, optionally overridden by FO_CONFIG_DIR.

Source code in src/file_organizer/api/dependencies.py
@lru_cache
def get_config_manager() -> ConfigManager:
    """Return a config manager, optionally overridden by FO_CONFIG_DIR."""
    config_dir = os.environ.get("FO_CONFIG_DIR")
    return ConfigManager(config_dir=config_dir)

get_db(settings=Depends(get_settings))

Yield a database session for auth data.

Source code in src/file_organizer/api/dependencies.py
def get_db(settings: ApiSettings = Depends(get_settings)) -> Generator[Session, None, None]:
    """Yield a database session for auth data."""
    session = create_session(settings.auth_db_path)
    try:
        yield session
    finally:
        session.close()

get_token_store(settings=Depends(get_settings))

Return the token store for the current settings.

Source code in src/file_organizer/api/dependencies.py
def get_token_store(settings: ApiSettings = Depends(get_settings)) -> TokenStore:
    """Return the token store for the current settings."""
    return _token_store_cached(settings.auth_redis_url)

get_login_rate_limiter(settings=Depends(get_settings))

Return the login rate limiter for the current settings.

Source code in src/file_organizer/api/dependencies.py
def get_login_rate_limiter(settings: ApiSettings = Depends(get_settings)) -> LoginRateLimiter:
    """Return the login rate limiter for the current settings."""
    return _login_rate_limiter_cached(
        settings.auth_redis_url,
        settings.auth_login_max_attempts,
        settings.auth_login_window_seconds,
    )

get_current_user(request, token=Depends(oauth2_scheme), settings=Depends(get_settings), db=Depends(get_db), token_store=Depends(get_token_store))

Resolve and return the current authenticated user.

Source code in src/file_organizer/api/dependencies.py
def get_current_user(
    request: Request,
    token: Optional[str] = Depends(oauth2_scheme),
    settings: ApiSettings = Depends(get_settings),
    db: Session = Depends(get_db),
    token_store: TokenStore = Depends(get_token_store),
) -> UserLike:
    """Resolve and return the current authenticated user."""
    if not settings.auth_enabled:
        return AnonymousUser()
    if not token:
        api_key = request.headers.get(settings.api_key_header)
        if settings.api_key_enabled and api_key:
            key_id = getattr(request.state, "api_key_identifier", None)
            if not key_id:
                key_id = api_key_identifier(api_key, settings.api_key_hashes)
            if key_id:
                return ApiKeyIdentity(
                    id=f"api-key:{key_id}",
                    username=f"api-key-{key_id}",
                    is_admin=settings.api_key_admin,
                )
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Missing authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    try:
        payload = decode_token(token, settings)
    except Exception as exc:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        ) from exc

    if not is_access_token(payload):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid access token",
            headers={"WWW-Authenticate": "Bearer"},
        )

    jti = payload.get("jti")
    if isinstance(jti, str) and token_store.is_access_revoked(jti):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Access token revoked",
            headers={"WWW-Authenticate": "Bearer"},
        )

    user_id = payload.get("user_id")
    if not isinstance(user_id, str):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid access token",
            headers={"WWW-Authenticate": "Bearer"},
        )

    user = db.query(User).filter(User.id == user_id).first()
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="User not found",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

get_current_active_user(user=Depends(get_current_user), settings=Depends(get_settings))

Return the current user, raising 400 if inactive.

Source code in src/file_organizer/api/dependencies.py
def get_current_active_user(
    user: UserLike = Depends(get_current_user),
    settings: ApiSettings = Depends(get_settings),
) -> UserLike:
    """Return the current user, raising 400 if inactive."""
    if not settings.auth_enabled:
        return user
    if not user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")
    return user

require_admin_user(user=Depends(get_current_active_user), settings=Depends(get_settings))

Return the current user, raising 403 if not an admin.

Source code in src/file_organizer/api/dependencies.py
def require_admin_user(
    user: UserLike = Depends(get_current_active_user),
    settings: ApiSettings = Depends(get_settings),
) -> UserLike:
    """Return the current user, raising 403 if not an admin."""
    if not settings.auth_enabled:
        return user
    if not user.is_admin:
        raise HTTPException(status_code=403, detail="Admin privileges required")
    return user

Utilities

file_organizer.api.utils

Shared helpers for API routers.

Classes

Functions

resolve_path(path_value, allowed_paths=None)

Expand and normalize a filesystem path.

Source code in src/file_organizer/api/utils.py
def resolve_path(path_value: str, allowed_paths: Optional[list[str]] = None) -> Path:
    """Expand and normalize a filesystem path."""
    # Path is validated against allowed roots below.
    resolved = Path(path_value).expanduser()  # codeql[py/path-injection]
    resolved_str = os.path.realpath(resolved)
    if not allowed_paths:
        raise ApiError(
            status_code=403,
            error="path_not_allowed",
            message="No allowed paths configured for this API instance.",
        )

    # Allowed roots are configuration-controlled.
    # codeql[py/path-injection]
    roots = [os.path.realpath(Path(root).expanduser()) for root in allowed_paths]
    if not roots:
        raise ApiError(
            status_code=403,
            error="path_not_allowed",
            message="No allowed paths configured for this API instance.",
        )
    try:
        allowed = any(os.path.commonpath([resolved_str, root]) == root for root in roots)
    except ValueError:
        allowed = False
    if not allowed:
        raise ApiError(
            status_code=403,
            error="path_not_allowed",
            message="Path is outside allowed roots.",
        )

    return Path(resolved_str)

is_hidden(path)

Return True if any part of the path is hidden.

Source code in src/file_organizer/api/utils.py
def is_hidden(path: Path) -> bool:
    """Return True if any part of the path is hidden."""
    return any(part.startswith(".") for part in path.parts)

file_info_from_path(path)

Build FileInfo from a filesystem path, raising ApiError on failure.

Source code in src/file_organizer/api/utils.py
def file_info_from_path(path: Path) -> FileInfo:
    """Build FileInfo from a filesystem path, raising ApiError on failure."""
    try:
        stat = path.stat()
    except OSError as exc:
        if isinstance(exc, FileNotFoundError):
            raise ApiError(
                status_code=404,
                error="file_not_found",
                message=f"File not found: {path}",
            ) from exc
        if isinstance(exc, PermissionError):
            raise ApiError(
                status_code=403,
                error="file_access_error",
                message=f"Permission denied for {path}",
            ) from exc
        raise ApiError(
            status_code=500,
            error="file_access_error",
            message=f"Unable to access file metadata for {path}",
        ) from exc
    mime_type, _ = mimetypes.guess_type(path.as_posix())
    # Cross-platform creation time: st_birthtime (macOS), st_ctime (Windows),
    # st_mtime fallback (Linux — st_ctime is inode-change time, not creation).
    if hasattr(stat, "st_birthtime"):
        creation_ref = stat.st_birthtime
    elif os.name == "nt":
        creation_ref = stat.st_ctime
    else:
        creation_ref = stat.st_mtime
    return FileInfo(
        path=str(path),
        name=path.name,
        size=stat.st_size,
        created=datetime.fromtimestamp(creation_ref, tz=UTC),
        modified=datetime.fromtimestamp(stat.st_mtime, tz=UTC),
        file_type=path.suffix.lower() or "",
        mime_type=mime_type,
    )

Exceptions

file_organizer.api.exceptions

Exception handlers for the API layer.

Classes

ApiError(status_code, error, message, details=None) dataclass

Bases: Exception

Structured API error for consistent responses.

Functions
__post_init__()

Initialize ApiError and set exception message from fields.

Source code in src/file_organizer/api/exceptions.py
def __post_init__(self) -> None:
    """Initialize ApiError and set exception message from fields."""
    summary = f"{self.status_code} {self.error}: {self.message}"
    super().__init__(summary)

Functions

setup_exception_handlers(app)

Register exception handlers on the app.

Source code in src/file_organizer/api/exceptions.py
def setup_exception_handlers(app: FastAPI) -> None:
    """Register exception handlers on the app."""

    @app.exception_handler(RequestValidationError)
    async def validation_exception_handler(
        request: Request,
        exc: RequestValidationError,
    ) -> JSONResponse:
        logger.warning("Validation error on {}: {}", request.url.path, exc)
        return JSONResponse(
            status_code=422,
            content={
                "error": "validation_error",
                "message": "Invalid request payload.",
                "details": [{"loc": err.get("loc"), "msg": err.get("msg")} for err in exc.errors()],
            },
        )

    @app.exception_handler(ApiError)
    async def api_error_handler(
        request: Request,
        exc: ApiError,
    ) -> JSONResponse:
        logger.warning("API error on {}: {}", request.url.path, exc.error)
        payload: dict[str, Any] = {
            "error": exc.error,
            "message": exc.message,
        }
        if exc.details is not None:
            payload["details"] = exc.details
        return JSONResponse(status_code=exc.status_code, content=payload)

    @app.exception_handler(Exception)
    async def unhandled_exception_handler(
        request: Request,
        exc: Exception,
    ) -> JSONResponse:
        logger.exception("Unhandled error on {}", request.url.path)
        return JSONResponse(
            status_code=500,
            content={
                "error": "internal_server_error",
                "message": "Unexpected server error.",
            },
        )

Authentication Models

file_organizer.api.auth_models

SQLAlchemy models for API authentication.

Classes

User

Bases: Base

User model for API authentication.

Functions
__repr__()

Return string representation of User.

Source code in src/file_organizer/api/auth_models.py
def __repr__(self) -> str:
    """Return string representation of User."""
    return f"<User {self.username}>"

Server

file_organizer.api.main

FastAPI application entrypoint.

Classes

Functions

configure_logging(settings)

Configure structured logging to console and file.

Source code in src/file_organizer/api/main.py
def configure_logging(settings: ApiSettings) -> None:
    """Configure structured logging to console and file."""
    global _LOGGING_CONFIGURED
    if _LOGGING_CONFIGURED:
        return

    from file_organizer.config.path_manager import get_state_dir

    log_dir = get_state_dir() / "logs"
    log_file: Optional[Path] = None
    try:
        log_dir.mkdir(parents=True, exist_ok=True)
        log_file = log_dir / "api.log"
    except OSError as exc:
        print(f"Warning: failed to create log directory {log_dir}: {exc}", file=sys.stderr)

    logger.remove()
    logger.add(sys.stdout, level=settings.log_level, enqueue=True)
    if log_file is not None:
        logger.add(log_file, level=settings.log_level, rotation="10 MB", retention="14 days")

    _LOGGING_CONFIGURED = True

create_app(settings=None)

Create the FastAPI application.

Source code in src/file_organizer/api/main.py
def create_app(settings: Optional[ApiSettings] = None) -> FastAPI:
    """Create the FastAPI application."""
    settings = settings or load_settings()
    configure_logging(settings)

    docs_url = "/docs" if settings.enable_docs else None
    redoc_url = "/redoc" if settings.enable_docs else None

    @asynccontextmanager
    async def lifespan(_: FastAPI) -> AsyncIterator[None]:
        logger.info("Starting API in {} mode", settings.environment)
        yield
        logger.info("Shutting down API")

    app = FastAPI(
        title=settings.app_name,
        version=settings.version,
        description="REST API for AI-powered file organization",
        docs_url=docs_url,
        redoc_url=redoc_url,
        lifespan=lifespan,
    )

    if STATIC_DIR.exists():
        app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
    else:
        logger.warning("Static assets directory not found at {}", STATIC_DIR)

    setup_middleware(app, settings)
    setup_exception_handlers(app)
    app.dependency_overrides[get_settings] = lambda: settings
    app.state.integration_manager = build_integration_manager(settings)
    app.state.browser_extension_manager = build_browser_extension_manager(settings)

    app.include_router(web_router, prefix="/ui")
    app.include_router(health_router, prefix="/api/v1")
    app.include_router(auth_router, prefix="/api/v1")
    app.include_router(files_router, prefix="/api/v1")
    app.include_router(organize_router, prefix="/api/v1")
    app.include_router(analyze_router, prefix="/api/v1")
    app.include_router(search_router, prefix="/api/v1")
    app.include_router(config_router, prefix="/api/v1")
    app.include_router(dedupe_router, prefix="/api/v1")
    app.include_router(realtime_router, prefix="/api/v1")
    app.include_router(system_router, prefix="/api/v1")
    app.include_router(integrations_router, prefix="/api/v1")
    app.include_router(marketplace_router, prefix="/api/v1")
    app.include_router(plugin_api_router, prefix="/api/v1")
    if daemon_router is not None:
        app.include_router(daemon_router, prefix="/api/v1")

    @app.get("/")
    def root() -> dict[str, str]:
        return {
            "name": settings.app_name,
            "version": settings.version,
        }

    return app

get_app()

Get or create the FastAPI application instance (thread-safe).

This function implements lazy initialization with thread safety to avoid: - Import-time side effects (creating .config directories) - Multiple app instances due to race conditions in multi-threaded contexts - Test isolation issues in concurrent test environments

The first call to this function will trigger app creation via create_app(). Subsequent calls return the cached instance. Thread-safe via lock protection.

Intended for: Test infrastructure, application startup hooks, ASGI servers with multiple worker threads

Returns:

Type Description
FastAPI

The initialized and cached FastAPI application instance.

Source code in src/file_organizer/api/main.py
def get_app() -> FastAPI:
    """Get or create the FastAPI application instance (thread-safe).

    This function implements lazy initialization with thread safety to avoid:
    - Import-time side effects (creating .config directories)
    - Multiple app instances due to race conditions in multi-threaded contexts
    - Test isolation issues in concurrent test environments

    The first call to this function will trigger app creation via create_app().
    Subsequent calls return the cached instance. Thread-safe via lock protection.

    Intended for: Test infrastructure, application startup hooks, ASGI servers
    with multiple worker threads

    Returns:
        The initialized and cached FastAPI application instance.
    """
    global _app

    # Quick check without lock for performance (reading stale value is acceptable)
    if _app is not None:
        return _app

    # Double-checked locking pattern for thread-safe lazy initialization
    with _app_lock:
        # Re-check after acquiring lock (another thread may have initialized)
        if _app is None:
            _app = create_app()

        return _app