Skip to content

Security

security

Central security utilities for Linux AI NPU Assistant.

All security-sensitive operations are consolidated here so they can be reviewed, tested, and updated in one place.

Responsibilities

  • URL validation: block external hosts when network.allow_external is off.
  • Response sanitisation: strip control characters / oversized AI output before it reaches the UI or tool dispatcher.
  • Secure file I/O: atomic writes with owner-only (0o600) permissions.
  • Path permission checks: warn when config/history files are world-readable.
  • Rate limiting: token-bucket guard on AI backend calls.
  • Tool argument validation: sanitise AI-supplied JSON args before dispatch.
  • Secret masking: redact API keys in log output.

ExternalNetworkBlockedError

Bases: RuntimeError

Raised when a request to an external host is attempted while network.allow_external is False.

RateLimitExceededError

Bases: RuntimeError

Raised when the backend call rate limit is exceeded.

RateLimiter

RateLimiter(calls_per_minute=0)

Thread-safe token-bucket rate limiter for AI backend calls.

Parameters

calls_per_minute: Maximum number of calls allowed per minute. 0 disables limiting.

Usage

::

limiter = RateLimiter(calls_per_minute=30)
limiter.check()   # raises RateLimitExceededError if over limit
Source code in src/security.py
def __init__(self, calls_per_minute: int = 0) -> None:
    self._limit = calls_per_minute
    self._lock = threading.Lock()
    # Token bucket state
    self._tokens: float = float(max(calls_per_minute, 0))
    self._last_refill: float = time.monotonic()

check

check()

Consume one token or raise :class:RateLimitExceededError.

Call this immediately before every AI backend request.

Source code in src/security.py
def check(self) -> None:
    """Consume one token or raise :class:`RateLimitExceededError`.

    Call this immediately before every AI backend request.
    """
    if self._limit <= 0:
        return  # Limiting disabled
    with self._lock:
        self._refill()
        if self._tokens < 1.0:
            raise RateLimitExceededError(
                f"AI backend rate limit exceeded ({self._limit} calls/min). "
                "Please wait a moment before sending another message."
            )
        self._tokens -= 1.0

is_local_url

is_local_url(url)

Return True if url resolves to a loopback or RFC-1918 private address.

Only bare IP addresses and the hostname localhost/::1 are accepted as local. Any hostname that is not a bare IP (e.g. my-server.lan) is treated as potentially external and rejected when external traffic is off.

Source code in src/security.py
def is_local_url(url: str) -> bool:
    """Return *True* if *url* resolves to a loopback or RFC-1918 private address.

    Only bare IP addresses and the hostname ``localhost``/``::1`` are accepted
    as local.  Any hostname that is not a bare IP (e.g. ``my-server.lan``) is
    treated as potentially external and rejected when external traffic is off.
    """
    host = urlparse(url).hostname or ""
    if host in ("localhost", "::1"):
        return True
    try:
        addr = ipaddress.ip_address(host)
        return addr.is_loopback or addr.is_private
    except ValueError:
        return False

assert_local_url

assert_local_url(url, allow_external)

Raise :class:ExternalNetworkBlockedError if url is external and external traffic is not permitted.

Parameters

url: Full URL to validate. allow_external: If True the check is skipped entirely. Set this only when the user has explicitly opted in via network.allow_external: true.

Source code in src/security.py
def assert_local_url(url: str, allow_external: bool) -> None:
    """Raise :class:`ExternalNetworkBlockedError` if *url* is external and
    external traffic is not permitted.

    Parameters
    ----------
    url:
        Full URL to validate.
    allow_external:
        If ``True`` the check is skipped entirely.  Set this only when the
        user has explicitly opted in via ``network.allow_external: true``.
    """
    if allow_external:
        return
    if not is_local_url(url):
        raise ExternalNetworkBlockedError(
            f"Blocked attempt to contact external host: {url!r}\n"
            "All AI processing must stay local (network.allow_external is false).\n"
            "Point your backend URL at localhost or a private-network address."
        )

sanitize_ai_response

sanitize_ai_response(text, max_chars=_MAX_RESPONSE_CHARS)

Strip dangerous characters from text before it reaches the UI.

  • Removes ANSI escape sequences.
  • Removes C0/C1 control characters (keeps tab, newline, carriage-return).
  • Truncates to max_chars to prevent memory exhaustion from a runaway model.
Parameters

text: Raw text received from the AI backend. max_chars: Maximum number of characters to return. Text beyond this is silently dropped (the UI will show the truncation naturally during streaming).

Returns

str Sanitised text, safe to display in the UI.

Source code in src/security.py
def sanitize_ai_response(text: str, max_chars: int = _MAX_RESPONSE_CHARS) -> str:
    """Strip dangerous characters from *text* before it reaches the UI.

    - Removes ANSI escape sequences.
    - Removes C0/C1 control characters (keeps tab, newline, carriage-return).
    - Truncates to *max_chars* to prevent memory exhaustion from a runaway model.

    Parameters
    ----------
    text:
        Raw text received from the AI backend.
    max_chars:
        Maximum number of characters to return.  Text beyond this is silently
        dropped (the UI will show the truncation naturally during streaming).

    Returns
    -------
    str
        Sanitised text, safe to display in the UI.
    """
    if not text:
        return text
    cleaned = _CONTROL_CHAR_RE.sub("", text)
    if len(cleaned) > max_chars:
        logger.warning(
            "AI response truncated from %d to %d characters.",
            len(cleaned), max_chars,
        )
        cleaned = cleaned[:max_chars]
    return cleaned

secure_write

secure_write(path, data, mode=384)

Write data to path atomically with restricted permissions.

The file is written to a sibling .tmp file first, then renamed so the target is never partially written. After the rename the file's mode is set to mode (default 0o600 — owner read/write only).

Parameters

path: Destination file path. data: Text content to write (UTF-8 encoded). mode: POSIX file permission bits. Default 0o600 restricts the file to the owning user, preventing other local users from reading sensitive data such as conversation history or config files.

Source code in src/security.py
def secure_write(path: str | Path, data: str, mode: int = 0o600) -> None:
    """Write *data* to *path* atomically with restricted permissions.

    The file is written to a sibling ``.tmp`` file first, then renamed so the
    target is never partially written.  After the rename the file's mode is set
    to *mode* (default ``0o600`` — owner read/write only).

    Parameters
    ----------
    path:
        Destination file path.
    data:
        Text content to write (UTF-8 encoded).
    mode:
        POSIX file permission bits.  Default ``0o600`` restricts the file to
        the owning user, preventing other local users from reading sensitive
        data such as conversation history or config files.
    """
    p = Path(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    tmp = p.with_suffix(".tmp")
    try:
        # Remove any existing .tmp file to prevent hijacking and permission retention
        if tmp.exists():
            tmp.unlink()

        # Create file with restrictive permissions atomically to avoid TOCTOU vulnerability
        fd = os.open(tmp, os.O_CREAT | os.O_EXCL | os.O_WRONLY, mode)
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            f.write(data)

        # Defense-in-depth: explicitly enforce permissions in case umask or OS ignores `mode`
        tmp.chmod(mode)

        tmp.replace(p)
    except OSError:
        # Clean up the temp file if anything went wrong
        if tmp.exists():
            try:
                tmp.unlink()
            except OSError:
                pass
        raise

check_path_permissions

check_path_permissions(path, label='file')

Log a warning if path is readable by group or world.

Sensitive files such as conversation history and config files should be readable only by the owning user (mode 0o600 or 0o400).

Parameters

path: File to inspect. label: Human-readable label used in warning messages (e.g. "config file").

Source code in src/security.py
def check_path_permissions(path: str | Path, label: str = "file") -> None:
    """Log a warning if *path* is readable by group or world.

    Sensitive files such as conversation history and config files should be
    readable only by the owning user (mode ``0o600`` or ``0o400``).

    Parameters
    ----------
    path:
        File to inspect.
    label:
        Human-readable label used in warning messages (e.g. ``"config file"``).
    """
    p = Path(path)
    if not p.exists():
        return
    try:
        file_stat = p.stat()
        if file_stat.st_mode & (stat.S_IRGRP | stat.S_IROTH):
            logger.warning(
                "Security: %s %s is readable by group or world (mode=%o). "
                "Consider restricting it with: chmod 600 %s",
                label, p, file_stat.st_mode & 0o777, p,
            )
    except OSError as exc:
        logger.debug("Could not check permissions of %s: %s", p, exc)

validate_tool_args

validate_tool_args(args, schema=None)

Sanitise AI-supplied tool arguments before dispatch.

  • Strips null bytes from all string values.
  • Truncates oversized string values to :data:_MAX_ARG_STRING_LEN.
  • Optionally validates args against a JSON-schema properties map to ensure required fields are present and types match.
Parameters

args: Raw argument dict supplied by the AI (already JSON-decoded). schema: Optional JSON Schema dict with a "properties" key. Used only for presence and basic type checks; full JSON Schema validation is not performed.

Returns

dict Sanitised copy of args.

Raises

ValueError If a required field from the schema is missing. TypeError If a field's value is of the wrong primitive type.

Source code in src/security.py
def validate_tool_args(args: dict[str, Any], schema: dict | None = None) -> dict[str, Any]:
    """Sanitise AI-supplied tool arguments before dispatch.

    - Strips null bytes from all string values.
    - Truncates oversized string values to :data:`_MAX_ARG_STRING_LEN`.
    - Optionally validates *args* against a JSON-schema ``properties`` map to
      ensure required fields are present and types match.

    Parameters
    ----------
    args:
        Raw argument dict supplied by the AI (already JSON-decoded).
    schema:
        Optional JSON Schema dict with a ``"properties"`` key.  Used only for
        presence and basic type checks; full JSON Schema validation is not
        performed.

    Returns
    -------
    dict
        Sanitised copy of *args*.

    Raises
    ------
    ValueError
        If a required field from the schema is missing.
    TypeError
        If a field's value is of the wrong primitive type.
    """
    cleaned: dict[str, Any] = {}
    for key, value in args.items():
        if isinstance(value, str):
            # Strip null bytes
            value = _DANGEROUS_ARG_CHARS_RE.sub("", value)
            # Truncate oversized strings
            if len(value) > _MAX_ARG_STRING_LEN:
                logger.warning(
                    "Tool arg %r truncated from %d to %d chars.",
                    key, len(value), _MAX_ARG_STRING_LEN,
                )
                value = value[:_MAX_ARG_STRING_LEN]
        elif isinstance(value, list):
            # Sanitise string items in lists
            value = [
                _DANGEROUS_ARG_CHARS_RE.sub("", v)[:_MAX_ARG_STRING_LEN]
                if isinstance(v, str) else v
                for v in value
            ]
        cleaned[key] = value

    if schema:
        properties: dict = schema.get("properties", {})
        required: list[str] = schema.get("required", [])
        for field in required:
            if field not in cleaned:
                raise ValueError(
                    f"Tool call is missing required argument: {field!r}"
                )
        for field, field_schema in properties.items():
            if field not in cleaned:
                continue
            expected_type = field_schema.get("type")
            value = cleaned[field]
            _check_json_type(field, value, expected_type)

    return cleaned

mask_secret

mask_secret(value)

Return a masked version of value safe for logging.

Only the first two and last two characters are kept; everything in between is replaced with ***. Values shorter than 8 characters are fully masked.

Examples

mask_secret("sk-abc123xyz") 'skyz' mask_secret("short") ''

Source code in src/security.py
def mask_secret(value: str) -> str:
    """Return a masked version of *value* safe for logging.

    Only the first two and last two characters are kept; everything in between
    is replaced with ``***``.  Values shorter than 8 characters are fully
    masked.

    Examples
    --------
    >>> mask_secret("sk-abc123xyz")
    'sk***yz'
    >>> mask_secret("short")
    '***'
    """
    if not value or len(value) < _MIN_SECRET_LEN:
        return _MASK
    if len(value) < 8:
        return _MASK
    return f"{value[:2]}{_MASK}{value[-2:]}"

get_api_key_from_env

get_api_key_from_env(env_var)

Retrieve an API key from an environment variable.

The key is never read from the config file directly — it must always come from the process environment so it is not accidentally committed to version control.

Parameters

env_var: Name of the environment variable to read.

Returns

str The API key value, or an empty string if the variable is not set.

Source code in src/security.py
def get_api_key_from_env(env_var: str) -> str:
    """Retrieve an API key from an environment variable.

    The key is **never** read from the config file directly — it must always
    come from the process environment so it is not accidentally committed to
    version control.

    Parameters
    ----------
    env_var:
        Name of the environment variable to read.

    Returns
    -------
    str
        The API key value, or an empty string if the variable is not set.
    """
    if not env_var:
        return ""
    value = os.environ.get(env_var, "")
    if value:
        logger.debug("API key loaded from environment variable %r.", env_var)
    else:
        logger.debug("Environment variable %r is not set; no API key used.", env_var)
    return value