Skip to content

mlflow_secrets_auth

mlflow_secrets_auth

MLflow Secrets-Backed RequestAuthProvider.

Public API
  • SecretsAuthProviderFactory: Factory provider that delegates to the first enabled backend among Vault, AWS Secrets Manager, and Azure Key Vault.
  • version: Package version string (best-effort).

This module also exposes a best-effort __version__ so the CLI info command can display a version even in editable installs where distribution metadata may be unavailable.

SecretsAuthProviderFactory()

Bases: SecretsBackedAuthProvider

Factory that selects and delegates to an enabled provider.

Priority order

1) HashiCorp Vault 2) AWS Secrets Manager 3) Azure Key Vault

If no provider is enabled or instantiation fails, this factory behaves as "disabled" (e.g., returns defaults/None) while preserving MLflow semantics.

Attributes:

Name Type Description
_actual_provider SecretsBackedAuthProvider | None

The lazily-instantiated concrete provider, if any.

Initialize the factory with a default TTL.

Source code in src/mlflow_secrets_auth/__init__.py
def __init__(self) -> None:
    """Initialize the factory with a default TTL."""
    super().__init__("mlflow_secrets_auth", default_ttl=DEFAULT_TTL_SECONDS)
    self._actual_provider: SecretsBackedAuthProvider | None = None

base

Base classes and abstractions for MLflow secrets-backed authentication providers.

This module defines
  • Lightweight requests.auth.AuthBase implementations for Bearer, Basic, and custom-header auth.
  • SecretsBackedAuthProvider, an abstract base for MLflow RequestAuthProviders that obtain credentials from secret managers and cache them with a TTL.
Design notes
  • Providers implement _fetch_secret, _get_cache_key, _get_auth_mode, and _get_ttl.
  • Caching is delegated to cached_fetch and TTL validation to validate_ttl.
  • Secrets are parsed centrally via parse_secret_json and must resolve to either:
    • {"token": ""} OR
    • {"username": "...", "password": "..."}
  • Header name can be configured; "Authorization" is normalized to the canonical header.

All logging goes through safe_log to avoid leaking sensitive values.

BasicAuth(username, password, header_name=DEFAULT_AUTH_HEADER)

Bases: AuthBase

HTTP Basic authentication for requests.

If a non-standard header is configured, the base64 credentials are put into that header.

Attributes:

Name Type Description
username

Basic auth username.

password

Basic auth password.

header_name

Target header (defaults to "Authorization").

Initialize basic authentication.

Parameters:

Name Type Description Default
username str

Username for basic auth.

required
password str

Password for basic auth.

required
header_name str

HTTP header to inject (defaults to "Authorization").

DEFAULT_AUTH_HEADER
Source code in src/mlflow_secrets_auth/base.py
def __init__(self, username: str, password: str, header_name: str = DEFAULT_AUTH_HEADER) -> None:
    """Initialize basic authentication.

    Args:
        username: Username for basic auth.
        password: Password for basic auth.
        header_name: HTTP header to inject (defaults to "Authorization").

    """
    self.username = username
    self.password = password
    self.header_name = header_name
__call__(r)

Attach the basic auth header to the outgoing request.

Source code in src/mlflow_secrets_auth/base.py
def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest:
    """Attach the basic auth header to the outgoing request."""
    creds = f"{self.username}:{self.password}".encode()
    r.headers[self.header_name] = f"Basic {base64.b64encode(creds).decode()}"
    return r

BearerAuth(token, header_name=DEFAULT_AUTH_HEADER)

Bases: AuthBase

Bearer token authentication for requests.

The token is injected as: <header_name>: Bearer <token>

Attributes:

Name Type Description
token

Opaque bearer token.

header_name

Target header (defaults to "Authorization").

Initialize bearer authentication.

Parameters:

Name Type Description Default
token str

Bearer token string.

required
header_name str

HTTP header to inject (defaults to "Authorization").

DEFAULT_AUTH_HEADER
Source code in src/mlflow_secrets_auth/base.py
def __init__(self, token: str, header_name: str = DEFAULT_AUTH_HEADER) -> None:
    """Initialize bearer authentication.

    Args:
        token: Bearer token string.
        header_name: HTTP header to inject (defaults to "Authorization").

    """
    self.token = token
    self.header_name = header_name
__call__(r)

Attach the bearer token header to the outgoing request.

Source code in src/mlflow_secrets_auth/base.py
def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest:
    """Attach the bearer token header to the outgoing request."""
    r.headers[self.header_name] = f"Bearer {self.token}"
    return r

CustomHeaderAuth(token, header_name)

Bases: AuthBase

Custom header authentication for requests (token placed as-is).

Attributes:

Name Type Description
token

Opaque token to inject.

header_name

Target header.

Initialize custom header authentication.

Parameters:

Name Type Description Default
token str

Token value to inject directly.

required
header_name str

Header name where the token is placed.

required
Source code in src/mlflow_secrets_auth/base.py
def __init__(self, token: str, header_name: str) -> None:
    """Initialize custom header authentication.

    Args:
        token: Token value to inject directly.
        header_name: Header name where the token is placed.

    """
    self.token = token
    self.header_name = header_name
__call__(r)

Attach the opaque token to the configured header.

Source code in src/mlflow_secrets_auth/base.py
def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest:
    """Attach the opaque token to the configured header."""
    r.headers[self.header_name] = self.token
    return r

SecretData

Bases: TypedDict

Structured representation of parsed secret material.

SecretsBackedAuthProvider(provider_name, default_ttl=DEFAULT_TTL_SECONDS)

Bases: RequestAuthProvider, ABC

Abstract base class for secrets-backed MLflow auth providers.

Subclasses implement secret retrieval for a specific backend (e.g., Vault, AWS, Azure) and supply configuration inputs (cache key, auth mode, TTL).

This class handles
  • Provider enablement checks.
  • Host allowlisting for get_request_auth.
  • Cache + TTL validation.
  • Secret parsing and Auth object construction.

Parameters:

Name Type Description Default
provider_name str

Stable identifier used for logging and configuration.

required
default_ttl int

Fallback TTL in seconds if configured TTL is invalid.

DEFAULT_TTL_SECONDS

Attributes:

Name Type Description
provider_name

Provider identifier.

default_ttl

Default TTL for cache.

logger

Namespaced logger instance.

Initialize the provider base.

Parameters:

Name Type Description Default
provider_name str

Identifier of the provider (e.g., "vault").

required
default_ttl int

Default cache TTL in seconds.

DEFAULT_TTL_SECONDS
Source code in src/mlflow_secrets_auth/base.py
def __init__(self, provider_name: str, default_ttl: int = DEFAULT_TTL_SECONDS) -> None:
    """Initialize the provider base.

    Args:
        provider_name: Identifier of the provider (e.g., "vault").
        default_ttl: Default cache TTL in seconds.

    """
    self.provider_name = provider_name
    self.default_ttl = default_ttl
    self.logger = setup_logger(f"mlflow_secrets_auth.{provider_name}")
get_auth()

Return a requests Auth object (no URL filtering).

This method is used by MLflow when a per-request URL is not available.

Returns:

Type Description
AuthBase | None

A requests.auth.AuthBase instance or None when disabled/unavailable.

Source code in src/mlflow_secrets_auth/base.py
def get_auth(self) -> requests.auth.AuthBase | None:
    """Return a `requests` Auth object (no URL filtering).

    This method is used by MLflow when a per-request URL is not available.

    Returns:
        A `requests.auth.AuthBase` instance or None when disabled/unavailable.

    """
    if not self._is_enabled():
        safe_log(self.logger, logging.DEBUG, DEBUG_PROVIDER_NOT_ENABLED.format(provider=self.provider_name))
        return None

    try:
        secret_data = self._fetch_secret_cached()
        return None if not secret_data else self._create_auth(secret_data)
    except Exception as e:  # pragma: no cover — defensive guard
        safe_log(self.logger, logging.ERROR, ERROR_UNEXPECTED_PROVIDER.format(provider=self.provider_name, error=e))
        return None
get_name()

Return the provider name (instance method in recent MLflow versions).

Returns:

Type Description
str

Provider name for MLflow plugin discovery.

Source code in src/mlflow_secrets_auth/base.py
def get_name(self) -> str:
    """Return the provider name (instance method in recent MLflow versions).

    Returns:
        Provider name for MLflow plugin discovery.

    """
    return self.provider_name
get_request_auth(url)

Return a requests Auth object for a given MLflow request URL.

Applies host allowlisting to avoid credential leakage.

Parameters:

Name Type Description Default
url str

Full request URL for an MLflow call.

required

Returns:

Type Description
AuthBase | None

A requests.auth.AuthBase instance or None if not allowed/available.

Source code in src/mlflow_secrets_auth/base.py
def get_request_auth(self, url: str) -> requests.auth.AuthBase | None:
    """Return a `requests` Auth object for a given MLflow request URL.

    Applies host allowlisting to avoid credential leakage.

    Args:
        url: Full request URL for an MLflow call.

    Returns:
        A `requests.auth.AuthBase` instance or None if not allowed/available.

    """
    if not self._is_enabled():
        safe_log(self.logger, logging.DEBUG, DEBUG_PROVIDER_NOT_ENABLED.format(provider=self.provider_name))
        return None

    allowed_hosts = get_allowed_hosts()
    if not is_host_allowed(url, allowed_hosts):
        hostname = urlparse(url).hostname or "<unknown>"
        safe_log(
            self.logger,
            logging.INFO,
            INFO_HOST_NOT_ALLOWED.format(hostname=hostname),
        )
        return None

    try:
        secret_data = self._fetch_secret_cached()
        if not secret_data:
            safe_log(self.logger, logging.WARNING, WARNING_FETCH_FAILED.format(provider=self.provider_name))
            return None
        return self._create_auth(secret_data)
    except ValueError as e:
        # Configuration or parsing error — not fatal to the request.
        safe_log(self.logger, logging.WARNING, WARNING_CONFIG_ERROR.format(provider=self.provider_name, error=e))
        return None
    except Exception as e:  # pragma: no cover — defensive
        safe_log(self.logger, logging.ERROR, ERROR_UNEXPECTED_PROVIDER.format(provider=self.provider_name, error=e))
        return None

cache

TTL cache implementation for secrets.

Provides a lightweight, thread-safe cache with monotonic-clock-based TTLs and a simple decorator (cached_fetch) to memoize zero-argument callables.

Design goals
  • Monotonic time to avoid issues when the wall clock changes.
  • Thread safety via RLock.
  • No caching of failures: exceptions from the wrapped callable return None and are not stored.
  • Global cache instance for convenience, with helpers to clear and inspect size.

TTLCache()

Thread-safe TTL cache (monotonic-clock based).

Initialize an empty TTL cache with thread safety.

Source code in src/mlflow_secrets_auth/cache.py
def __init__(self) -> None:
    """Initialize an empty TTL cache with thread safety."""
    self._cache: dict[str, tuple[Any, float]] = {}
    self._lock = threading.RLock()
clear()

Clear all cached items.

Source code in src/mlflow_secrets_auth/cache.py
def clear(self) -> None:
    """Clear all cached items."""
    with self._lock:
        self._cache.clear()
delete(key)

Remove a key from the cache.

Parameters:

Name Type Description Default
key str

Cache key to remove.

required
Source code in src/mlflow_secrets_auth/cache.py
def delete(self, key: str) -> None:
    """Remove a key from the cache.

    Args:
        key: Cache key to remove.

    """
    with self._lock:
        self._cache.pop(key, None)
get(key)

Get a value from the cache if present and not expired.

Parameters:

Name Type Description Default
key str

Cache key.

required

Returns:

Type Description
Any | None

The cached value if present and valid, otherwise None.

Source code in src/mlflow_secrets_auth/cache.py
def get(self, key: str) -> Any | None:
    """Get a value from the cache if present and not expired.

    Args:
        key: Cache key.

    Returns:
        The cached value if present and valid, otherwise None.

    """
    with self._lock:
        entry = self._cache.get(key)
        if entry is None:
            return None
        value, expiry = entry
        if self._now() > expiry:
            self._cache.pop(key, None)
            return None
        return value
invalidate_prefix(prefix)

Remove all keys starting with a prefix.

Useful for provider-wide invalidation using e.g. f"{provider_name}:".

Parameters:

Name Type Description Default
prefix str

Prefix to match.

required
Source code in src/mlflow_secrets_auth/cache.py
def invalidate_prefix(self, prefix: str) -> None:
    """Remove all keys starting with a prefix.

    Useful for provider-wide invalidation using e.g. ``f"{provider_name}:"``.

    Args:
        prefix: Prefix to match.

    """
    with self._lock:
        to_delete = [k for k in self._cache if k.startswith(prefix)]
        for k in to_delete:
            self._cache.pop(k, None)
set(key, value, ttl_seconds)

Set a value in the cache with a TTL.

Non-positive or sub-minimum TTLs are treated as "no caching" (the key is removed). TTLs larger than MAX_TTL_SECONDS are capped.

Parameters:

Name Type Description Default
key str

Cache key.

required
value Any

Value to store.

required
ttl_seconds float

Time-to-live in seconds.

required
Source code in src/mlflow_secrets_auth/cache.py
def set(self, key: str, value: Any, ttl_seconds: float) -> None:
    """Set a value in the cache with a TTL.

    Non-positive or sub-minimum TTLs are treated as "no caching"
    (the key is removed). TTLs larger than MAX_TTL_SECONDS are capped.

    Args:
        key: Cache key.
        value: Value to store.
        ttl_seconds: Time-to-live in seconds.

    """
    with self._lock:
        if ttl_seconds < MIN_TTL_SECONDS:
            self._cache.pop(key, None)
            return
        ttl = min(float(ttl_seconds), float(MAX_TTL_SECONDS))
        self._cache[key] = (value, self._now() + ttl)
size()

Return the current cache size, pruning expired entries first.

Returns:

Type Description
int

Number of live (non-expired) entries.

Source code in src/mlflow_secrets_auth/cache.py
def size(self) -> int:
    """Return the current cache size, pruning expired entries first.

    Returns:
        Number of live (non-expired) entries.

    """
    with self._lock:
        now = self._now()
        to_delete = [k for k, (_, exp) in self._cache.items() if now > exp]
        for k in to_delete:
            self._cache.pop(k, None)
        return len(self._cache)

cached_fetch(cache_key, ttl_seconds=DEFAULT_TTL_SECONDS)

Cache a zero-argument function's result with a TTL.

Exceptions raised by the wrapped function are swallowed and result in None, which is not cached. Successful non-None results are cached.

Parameters:

Name Type Description Default
cache_key str

Unique cache key for the function result.

required
ttl_seconds int

Time-to-live for the cached value.

DEFAULT_TTL_SECONDS

Returns:

Type Description
Callable[[Callable[[], T]], Callable[[], T | None]]

A decorator that wraps a Callable[[], T] and returns Callable[[], Optional[T]].

Source code in src/mlflow_secrets_auth/cache.py
def cached_fetch(cache_key: str, ttl_seconds: int = DEFAULT_TTL_SECONDS) -> Callable[[Callable[[], T]], Callable[[], T | None]]:
    """Cache a zero-argument function's result with a TTL.

    Exceptions raised by the wrapped function are swallowed and result in `None`,
    which is not cached. Successful non-None results are cached.

    Args:
        cache_key: Unique cache key for the function result.
        ttl_seconds: Time-to-live for the cached value.

    Returns:
        A decorator that wraps a `Callable[[], T]` and returns `Callable[[], Optional[T]]`.

    """

    def decorator(fetch_func: Callable[[], T]) -> Callable[[], T | None]:
        def wrapper() -> T | None:
            cached_value = _global_cache.get(cache_key)
            if cached_value is not None:
                # Typing note: caller-provided T is preserved by construction.
                return cached_value  # type: ignore[return-value]

            try:
                value = fetch_func()
            except Exception:
                # Do not cache failures; return None to the caller.
                return None

            if value is not None:
                _global_cache.set(cache_key, value, ttl_seconds)
            return value

        return wrapper

    return decorator

clear_cache()

Clear the global cache instance.

Source code in src/mlflow_secrets_auth/cache.py
def clear_cache() -> None:
    """Clear the global cache instance."""
    _global_cache.clear()

delete_cache_key(key)

Remove a single cache entry by key.

Parameters:

Name Type Description Default
key str

Cache key to remove.

required
Source code in src/mlflow_secrets_auth/cache.py
def delete_cache_key(key: str) -> None:
    """Remove a single cache entry by key.

    Args:
        key: Cache key to remove.

    """
    _global_cache.delete(key)

get_cache_size()

Get the current size of the global cache (after pruning).

Source code in src/mlflow_secrets_auth/cache.py
def get_cache_size() -> int:
    """Get the current size of the global cache (after pruning)."""
    return _global_cache.size()

cli

Command-line interface (CLI) for MLflow Secrets Auth.

Subcommands
  • info - Show version, enabled providers, and configuration snapshot.
  • doctor - Run diagnostics against the configured provider.

doctor_command(args)

Run diagnostics against the configured provider.

Steps

1) Resolve enabled provider. 2) Validate provider configuration (auth mode, TTL, header). 3) Fetch secret and construct an auth object. 4) Optional dry-run: issue a HEAD request to the given URL's origin.

Parameters:

Name Type Description Default
args Namespace

Parsed CLI args (supports --dry-run URL).

required

Returns:

Type Description
int

Process exit code (0 on success, non-zero on error).

Source code in src/mlflow_secrets_auth/cli.py
def doctor_command(args: argparse.Namespace) -> int:
    """Run diagnostics against the configured provider.

    Steps:
      1) Resolve enabled provider.
      2) Validate provider configuration (auth mode, TTL, header).
      3) Fetch secret and construct an auth object.
      4) Optional dry-run: issue a HEAD request to the given URL's origin.

    Args:
        args: Parsed CLI args (supports `--dry-run` URL).

    Returns:
        Process exit code (0 on success, non-zero on error).

    """
    setup_logger("mlflow_secrets_auth.cli")

    _print_header(CLI_HEADER_DOCTOR)

    provider_name, provider = get_enabled_provider()
    if provider_name is None or provider is None:
        enabled_providers = [name for name in PROVIDERS if is_provider_enabled(name)]
        if not enabled_providers:
            print(f"{STATUS_ERROR} {DOCTOR_NO_PROVIDERS_ENABLED}")
            print(f"  {DOCTOR_AVAILABLE_PROVIDERS.format(providers=', '.join(PROVIDERS.keys()))}")
        else:
            print(f"{STATUS_ERROR} {DOCTOR_FOUND_ENABLED.format(providers=', '.join(enabled_providers))}")
            print(f"  {DOCTOR_INIT_FAILED}")
        return EXIT_ERROR

    print(f"{STATUS_SUCCESS} {DOCTOR_PROVIDER_OK.format(provider=provider_name)}")

    # Config snapshot
    try:
        auth_mode = provider._get_auth_mode()  # noqa: SLF001
        print(f"{STATUS_SUCCESS} {DOCTOR_AUTH_MODE_OK.format(mode=auth_mode)}")

        auth_header = get_auth_header_name()
        print(f"{STATUS_SUCCESS} {DOCTOR_AUTH_HEADER_OK.format(header=auth_header)}")

        ttl = provider._get_ttl()  # noqa: SLF001
        print(f"{STATUS_SUCCESS} {DOCTOR_TTL_OK.format(ttl=ttl)}")

        cache_size = get_cache_size()
        print(f"{STATUS_SUCCESS} {DOCTOR_CACHE_SIZE_OK.format(size=cache_size)}")

        allowed_hosts = get_allowed_hosts()
        if allowed_hosts:
            print(f"{STATUS_SUCCESS} {DOCTOR_ALLOWED_HOSTS_OK.format(hosts=', '.join(allowed_hosts))}")
        else:
            print(f"{STATUS_SUCCESS} {DOCTOR_ALL_HOSTS_OK}")
    except Exception as e:
        print(f"{STATUS_ERROR} {DOCTOR_CONFIG_ERROR.format(error=str(e))}")
        return EXIT_ERROR

    # Test secret fetch + auth construction
    try:
        print(f"{STATUS_TESTING} {DOCTOR_TESTING_SECRET_FETCH}")
        secret_data = provider._fetch_secret_cached()  # noqa: SLF001
        if not secret_data:
            print(f"{STATUS_ERROR} {DOCTOR_SECRET_FETCH_FAILED}")
            return EXIT_ERROR

        print(f"{STATUS_SUCCESS} {DOCTOR_SECRET_FETCH_OK}")

        try:
            print(f"{STATUS_TESTING} {DOCTOR_TESTING_AUTH_CONSTRUCTION}")
            auth = provider._create_auth(secret_data)  # noqa: SLF001
            print(f"{STATUS_SUCCESS} {DOCTOR_AUTH_CONSTRUCTION_OK}")
            print(f"  {DOCTOR_AUTH_TYPE.format(auth_type=type(auth).__name__)}")
        except Exception as e:
            print(f"{STATUS_ERROR} {DOCTOR_AUTH_CONSTRUCTION_FAILED.format(error=str(e))}")
            return EXIT_ERROR
    except Exception as e:
        print(f"{STATUS_ERROR} {DOCTOR_SECRET_FETCH_ERROR.format(error=str(e))}")
        return EXIT_ERROR

    # Optional dry-run against a URL
    if args.dry_run:
        print(f"{STATUS_TESTING} {DOCTOR_DRY_RUN_TESTING.format(url=args.dry_run)}")

        parsed = urlparse(args.dry_run)
        if not parsed.scheme or not parsed.netloc:
            print(f"{STATUS_ERROR} {DOCTOR_INVALID_URL}")
            return EXIT_ERROR

        allowed_hosts = get_allowed_hosts()
        if allowed_hosts and not is_host_allowed(args.dry_run, allowed_hosts):
            print(f"{STATUS_ERROR} {DOCTOR_HOST_NOT_ALLOWED.format(host=parsed.netloc, allowed_hosts=', '.join(allowed_hosts))}")
            return EXIT_ERROR

        try:
            auth = provider.get_request_auth(args.dry_run)
            if auth is None:
                print(f"{STATUS_ERROR} {DOCTOR_NO_AUTH_FOR_URL}")
                return EXIT_ERROR

            origin = f"{parsed.scheme}://{parsed.netloc}/"
            print(f"{STATUS_TESTING} {DOCTOR_MAKING_REQUEST.format(origin=origin)}")

            try:
                response = requests.head(origin, auth=auth, timeout=DEFAULT_REQUEST_TIMEOUT, allow_redirects=True)
                print(f"{STATUS_SUCCESS} {DOCTOR_REQUEST_SUCCESS.format(status_code=response.status_code)}")
            except requests.exceptions.RequestException as e:
                print(f"{STATUS_ERROR} {DOCTOR_REQUEST_FAILED.format(error=str(e))}")
                # Don't return error code for network issues in dry-run

        except Exception as e:
            print(f"{STATUS_ERROR} {DOCTOR_DRY_RUN_FAILED.format(error=str(e))}")
            return EXIT_ERROR

    print(f"\n{STATUS_SUCCESS} {DOCTOR_ALL_PASSED}")
    return EXIT_SUCCESS

get_enabled_provider()

Return the first enabled provider as (name, instance), or (None, None).

Returns:

Type Description
ProviderTuple

Tuple of provider name and instance, or (None, None) if none enabled.

Source code in src/mlflow_secrets_auth/cli.py
def get_enabled_provider() -> ProviderTuple:
    """Return the first enabled provider as (name, instance), or (None, None).

    Returns:
        Tuple of provider name and instance, or (None, None) if none enabled.

    """
    for name, cls in PROVIDERS.items():
        if is_provider_enabled(name):
            try:
                return name, cls()  # type: ignore[call-arg]  # compatible ctor
            except Exception:  # pragma: no cover — defensive
                return name, None
    return None, None

info_command(_)

Show plugin version and configuration snapshot.

Returns:

Type Description
int

Process exit code (0 on success, non-zero on error).

Source code in src/mlflow_secrets_auth/cli.py
def info_command(_: argparse.Namespace) -> int:
    """Show plugin version and configuration snapshot.

    Returns:
        Process exit code (0 on success, non-zero on error).

    """
    setup_logger("mlflow_secrets_auth.cli")

    _print_header(CLI_HEADER_INFO)

    # Version
    try:
        version = importlib.metadata.version(PACKAGE_NAME)
        print(f"{STATUS_SUCCESS} {INFO_VERSION.format(version=version)}")
    except importlib.metadata.PackageNotFoundError:
        # Fallback for editable installs if distribution metadata is absent
        with contextlib.suppress(Exception):
            print(f"{STATUS_SUCCESS} {INFO_VERSION_DEV}")

    # Providers
    enabled_providers = [name for name in PROVIDERS if is_provider_enabled(name)]
    if enabled_providers:
        print(f"{STATUS_SUCCESS} {INFO_ENABLED_PROVIDERS.format(providers=', '.join(enabled_providers))}")
    else:
        print(f"{STATUS_ERROR} {INFO_NO_ENABLED_PROVIDERS}")

    all_providers = list(PROVIDERS.keys())
    disabled_providers = [name for name in all_providers if name not in enabled_providers]
    if disabled_providers:
        print(f"  {INFO_DISABLED_PROVIDERS.format(providers=', '.join(disabled_providers))}")

    # Config snapshot
    allowed_hosts = get_allowed_hosts()
    if allowed_hosts:
        print(f"{STATUS_SUCCESS} {INFO_ALLOWED_HOSTS.format(hosts=', '.join(allowed_hosts))}")
    else:
        print(f"{STATUS_SUCCESS} {INFO_ALL_HOSTS_ALLOWED}")

    auth_header = get_auth_header_name()
    print(f"{STATUS_SUCCESS} {INFO_AUTH_HEADER.format(header=auth_header)}")

    cache_size = get_cache_size()
    print(f"{STATUS_SUCCESS} {INFO_CACHE_SIZE.format(size=cache_size)}")

    return EXIT_SUCCESS

main()

Run the CLI entry point.

Returns:

Type Description
int

Process exit code (0 on success, non-zero on error).

Source code in src/mlflow_secrets_auth/cli.py
def main() -> int:
    """Run the CLI entry point.

    Returns:
        Process exit code (0 on success, non-zero on error).

    """
    parser = argparse.ArgumentParser(description=CLI_DESCRIPTION)
    subparsers = parser.add_subparsers(dest="command", help=CLI_HELP_COMMANDS)

    # doctor
    doctor_parser = subparsers.add_parser(CLI_COMMAND_DOCTOR, help=CLI_HELP_DOCTOR)
    doctor_parser.add_argument(
        CLI_ARG_DRY_RUN,
        metavar="URL",
        help=CLI_HELP_DRY_RUN,
    )

    # info
    subparsers.add_parser(CLI_COMMAND_INFO, help=CLI_HELP_INFO)

    args = parser.parse_args()

    if args.command == CLI_COMMAND_DOCTOR:
        return doctor_command(args)
    if args.command == CLI_COMMAND_INFO:
        return info_command(args)

    parser.print_help()
    return EXIT_ERROR

config

Configuration utilities for MLflow secrets auth providers.

This module centralizes environment-driven configuration and safe redaction helpers.

Key env vars
  • MLFLOW_SECRETS_ALLOWED_HOSTS: Comma-separated host allowlist.
  • MLFLOW_AUTH_HEADER_NAME: Custom header for auth (defaults to "Authorization").
  • MLFLOW_SECRETS_LOG_LEVEL: Logging level (defaults to "INFO").
  • MLFLOW_SECRETS_AUTH_ENABLE: Comma-separated list of enabled providers.
  • MLFLOW_SECRETS_AUTH_ENABLE_: Per-provider boolean toggle (e.g., AWS_SECRETS_MANAGER).

get_allowed_hosts()

Return the host allowlist from MLFLOW_SECRETS_ALLOWED_HOSTS.

Supports both exact hostnames and wildcard patterns using shell-style globbing.

Examples:

MLFLOW_SECRETS_ALLOWED_HOSTS="mlflow.example.com,.corp.example.com" MLFLOW_SECRETS_ALLOWED_HOSTS="api.prod.com,.staging.com,localhost"

Wildcard patterns
  • "*.corp.example.com" matches any subdomain of corp.example.com
  • "mlflow.*.com" matches mlflow with any middle component
  • "api-*" matches hostnames starting with "api-"

Returns:

Type Description
list[str] | None

A list of hostname patterns, or None if not configured.

Source code in src/mlflow_secrets_auth/config.py
def get_allowed_hosts() -> list[str] | None:
    """Return the host allowlist from MLFLOW_SECRETS_ALLOWED_HOSTS.

    Supports both exact hostnames and wildcard patterns using shell-style globbing.

    Examples:
        MLFLOW_SECRETS_ALLOWED_HOSTS="mlflow.example.com,*.corp.example.com"
        MLFLOW_SECRETS_ALLOWED_HOSTS="api.prod.com,*.staging.com,localhost"

    Wildcard patterns:
        - "*.corp.example.com" matches any subdomain of corp.example.com
        - "mlflow.*.com" matches mlflow with any middle component
        - "api-*" matches hostnames starting with "api-"

    Returns:
        A list of hostname patterns, or None if not configured.

    """
    hosts_str = get_env_var(ENV_ALLOWED_HOSTS)
    if not hosts_str:
        return None
    hosts = [h.strip() for h in hosts_str.split(",") if h.strip()]
    return hosts or None

get_auth_header_name()

Return the configured auth header name.

Defaults to "Authorization" when MLFLOW_AUTH_HEADER_NAME is unset.

Returns:

Type Description
str

Header name as a string.

Source code in src/mlflow_secrets_auth/config.py
def get_auth_header_name() -> str:
    """Return the configured auth header name.

    Defaults to "Authorization" when MLFLOW_AUTH_HEADER_NAME is unset.

    Returns:
        Header name as a string.

    """
    return get_env_var(ENV_AUTH_HEADER_NAME, DEFAULT_AUTH_HEADER) or DEFAULT_AUTH_HEADER

get_env_bool(name, default=False)

Return an environment variable parsed as a boolean.

Recognized truthy values (case-insensitive): {"1", "true", "yes", "on"}.

Parameters:

Name Type Description Default
name str

Environment variable name.

required
default bool

Fallback when the variable is unset.

False

Returns:

Type Description
bool

Parsed boolean value.

Source code in src/mlflow_secrets_auth/config.py
def get_env_bool(name: str, default: bool = False) -> bool:
    """Return an environment variable parsed as a boolean.

    Recognized truthy values (case-insensitive): {"1", "true", "yes", "on"}.

    Args:
        name: Environment variable name.
        default: Fallback when the variable is unset.

    Returns:
        Parsed boolean value.

    """
    value = get_env_var(name)
    if value is None:
        return default
    return value.strip().lower() in TRUTHY_VALUES

get_env_int(name, default)

Return an environment variable parsed as int.

On parsing error or if unset, returns default.

Parameters:

Name Type Description Default
name str

Environment variable name.

required
default int

Fallback value.

required

Returns:

Type Description
int

Parsed integer or default.

Source code in src/mlflow_secrets_auth/config.py
def get_env_int(name: str, default: int) -> int:
    """Return an environment variable parsed as int.

    On parsing error or if unset, returns `default`.

    Args:
        name: Environment variable name.
        default: Fallback value.

    Returns:
        Parsed integer or `default`.

    """
    value = get_env_var(name)
    if value is None:
        return default
    try:
        return int(value.strip())
    except (TypeError, ValueError):
        return default

get_env_var(name, default=None)

Return an environment variable or a default.

Parameters:

Name Type Description Default
name str

Environment variable name.

required
default str | None

Value to return if not set.

None

Returns:

Type Description
str | None

The environment value as a string, or default when unset.

Source code in src/mlflow_secrets_auth/config.py
def get_env_var(name: str, default: str | None = None) -> str | None:
    """Return an environment variable or a default.

    Args:
        name: Environment variable name.
        default: Value to return if not set.

    Returns:
        The environment value as a string, or `default` when unset.

    """
    return os.environ.get(name, default)

get_log_level()

Return the configured log level for secrets auth.

Defaults to "INFO" and uppercases the value for consistency.

Returns:

Type Description
str

Uppercased logging level string (e.g., "INFO", "DEBUG").

Source code in src/mlflow_secrets_auth/config.py
def get_log_level() -> str:
    """Return the configured log level for secrets auth.

    Defaults to "INFO" and uppercases the value for consistency.

    Returns:
        Uppercased logging level string (e.g., "INFO", "DEBUG").

    """
    return (get_env_var(ENV_LOG_LEVEL, DEFAULT_LOG_LEVEL) or DEFAULT_LOG_LEVEL).upper()

is_provider_enabled(provider_name)

Return whether a specific provider is enabled.

Two mechanisms

1) Global list: MLFLOW_SECRETS_AUTH_ENABLE="vault,aws-secrets-manager,azure-key-vault" 2) Per-provider boolean: MLFLOW_SECRETS_AUTH_ENABLE_=true e.g. MLFLOW_SECRETS_AUTH_ENABLE_AWS_SECRETS_MANAGER=true

Parameters:

Name Type Description Default
provider_name str

Provider slug (case-insensitive), e.g. "vault".

required

Returns:

Type Description
bool

True if enabled via either mechanism, False otherwise.

Source code in src/mlflow_secrets_auth/config.py
def is_provider_enabled(provider_name: str) -> bool:
    """Return whether a specific provider is enabled.

    Two mechanisms:
      1) Global list: MLFLOW_SECRETS_AUTH_ENABLE="vault,aws-secrets-manager,azure-key-vault"
      2) Per-provider boolean: MLFLOW_SECRETS_AUTH_ENABLE_<PROVIDER>=true
         e.g. MLFLOW_SECRETS_AUTH_ENABLE_AWS_SECRETS_MANAGER=true

    Args:
        provider_name: Provider slug (case-insensitive), e.g. "vault".

    Returns:
        True if enabled via either mechanism, False otherwise.

    """
    # Global list
    global_enable = get_env_var(ENV_AUTH_ENABLE, "") or ""
    enabled = {p.strip().lower() for p in global_enable.split(",") if p.strip()}
    if provider_name.strip().lower() in enabled:
        return True

    # Provider-specific toggle
    env_key = f"{ENV_AUTH_ENABLE_PREFIX}{provider_name.upper().replace('-', '_')}"
    return get_env_bool(env_key, False)

mask_secret(value, mask_char=DEFAULT_MASK_CHAR, show_chars=DEFAULT_SHOW_CHARS)

Mask a secret value for safe logging.

Examples:

>>> mask_secret("abcd1234")
'abcd********1234'
>>> mask_secret("ab")
'***'

Parameters:

Name Type Description Default
value str

Secret value to mask.

required
mask_char str

Masking character (default '*').

DEFAULT_MASK_CHAR
show_chars int

Number of leading and trailing chars to keep (default 4).

DEFAULT_SHOW_CHARS

Returns:

Type Description
str

Masked representation with the center portion obfuscated.

Source code in src/mlflow_secrets_auth/config.py
def mask_secret(value: str, mask_char: str = DEFAULT_MASK_CHAR, show_chars: int = DEFAULT_SHOW_CHARS) -> str:
    """Mask a secret value for safe logging.

    Examples:
        >>> mask_secret("abcd1234")
        'abcd********1234'
        >>> mask_secret("ab")
        '***'

    Args:
        value: Secret value to mask.
        mask_char: Masking character (default '*').
        show_chars: Number of leading and trailing chars to keep (default 4).

    Returns:
        Masked representation with the center portion obfuscated.

    """
    if not value:
        return mask_char * 8

    # Guard against non-positive show_chars
    show = max(0, int(show_chars))

    if len(value) <= show:
        return mask_char * max(3, len(value))
    if len(value) <= show * 2:
        # Keep a small preview while masking the middle
        keep = min(2, len(value))
        return f"{value[:keep]}{mask_char * 4}{value[-keep:]}"
    return f"{value[:show]}{mask_char * 8}{value[-show:]}"

redact_sensitive_data(text)

Redact common credential patterns from text.

Safely handles patterns with different group counts. Intended for logs and messages.

Parameters:

Name Type Description Default
text str

Input string possibly containing sensitive material.

required

Returns:

Type Description
str

Redacted string with secrets masked.

Source code in src/mlflow_secrets_auth/config.py
def redact_sensitive_data(text: str) -> str:
    """Redact common credential patterns from text.

    Safely handles patterns with different group counts. Intended for logs and messages.

    Args:
        text: Input string possibly containing sensitive material.

    Returns:
        Redacted string with secrets masked.

    """
    if not text:
        return text

    def _sub(m: re.Match[str]) -> str:
        groups = m.groups()
        # One-group pattern: mask entire match
        if len(groups) == 1:
            return mask_secret(groups[0])
        # Two/three-group patterns: mask the middle secret
        if len(groups) >= 2:
            prefix = groups[0]
            secret = groups[1]
            suffix = groups[2] if len(groups) >= 3 else ""
            return f"{prefix}{mask_secret(secret)}{suffix}"
        # Fallback to original text (should not happen with defined patterns)
        return m.group(0)

    result = text
    for pattern in _REDACT_PATTERNS:
        result = pattern.sub(_sub, result)
    return result

constants

Constants for MLflow Secrets Auth.

This module centralizes all configuration constants, environment variable names, default values, and magic strings used throughout the project.

messages

User-facing messages for MLflow Secrets Auth.

This module centralizes all user-facing messages including CLI output, error messages, log messages, and help text.

providers

Empty file to make providers a package.

aws_secrets_manager

AWS Secrets Manager authentication provider.

AWSSecretsManagerAuthProvider()

Bases: SecretsBackedAuthProvider

Authentication provider using AWS Secrets Manager.

Requires the optional dependency boto3.

Environment variables

AWS_REGION: AWS region (e.g., "eu-west-1"). Required. MLFLOW_AWS_SECRET_ID: Secret identifier or ARN. Required. MLFLOW_AWS_AUTH_MODE: "bearer" (default) or "basic". MLFLOW_AWS_TTL_SEC: Cache TTL in seconds (defaults to provider's default TTL).

Initialize the provider with a default TTL and lazy AWS client.

Source code in src/mlflow_secrets_auth/providers/aws_secrets_manager.py
def __init__(self) -> None:
    """Initialize the provider with a default TTL and lazy AWS client."""
    super().__init__(PROVIDER_AWS, default_ttl=DEFAULT_TTL_SECONDS)
    self._secrets_client: Any | None = None  # boto3 client when available

azure_key_vault

Azure Key Vault authentication provider.

AzureKeyVaultAuthProvider()

Bases: SecretsBackedAuthProvider

Authentication provider using Azure Key Vault.

Requires optional dependencies: azure-identity and azure-keyvault-secrets.

Environment variables

AZURE_KEY_VAULT_URL: Full Key Vault URL (e.g., "https://myvault.vault.azure.net"). Required. MLFLOW_AZURE_SECRET_NAME: Secret name to retrieve. Required. MLFLOW_AZURE_AUTH_MODE: "bearer" (default) or "basic". MLFLOW_AZURE_TTL_SEC: Cache TTL in seconds (defaults to provider's default TTL).

Initialize the provider with a default TTL and a lazy SecretClient.

Source code in src/mlflow_secrets_auth/providers/azure_key_vault.py
def __init__(self) -> None:
    """Initialize the provider with a default TTL and a lazy SecretClient."""
    super().__init__(PROVIDER_AZURE, default_ttl=DEFAULT_TTL_SECONDS)
    self._secret_client: Any | None = None  # azure.keyvault.secrets.SecretClient when available

vault

HashiCorp Vault authentication provider.

VaultAuthProvider()

Bases: SecretsBackedAuthProvider

Authentication provider using HashiCorp Vault.

Supports token and AppRole authentication via the hvac client (optional dependency). Secrets are retrieved from KV v2 when possible with a graceful fallback to KV v1.

Environment variables

VAULT_ADDR: Vault server address, e.g. "https://vault.example.com" VAULT_TOKEN: Vault token for direct authentication (optional). VAULT_ROLE_ID: AppRole role ID (used if VAULT_TOKEN is not provided). VAULT_SECRET_ID: AppRole secret ID (used if VAULT_TOKEN is not provided). MLFLOW_VAULT_SECRET_PATH: Secret path (e.g. "secret/mlflow/auth" or "secret/data/mlflow/auth"). MLFLOW_VAULT_AUTH_MODE: "bearer" (default) or "basic". MLFLOW_VAULT_TTL_SEC: Cache TTL in seconds (defaults to provider's default TTL).

Notes
  • When using KV v2, this implementation auto-detects common path formats and reads via client.secrets.kv.v2.read_secret_version.
  • For KV v1, it falls back to client.secrets.kv.v1.read_secret.
  • Secret dictionaries are JSON-encoded for centralized parsing in the base class.

Initialize the provider with a default TTL and a lazy hvac client.

Source code in src/mlflow_secrets_auth/providers/vault.py
def __init__(self) -> None:
    """Initialize the provider with a default TTL and a lazy hvac client."""
    super().__init__(PROVIDER_VAULT, default_ttl=DEFAULT_TTL_SECONDS)
    self._vault_client: Any | None = None  # hvac.Client if available

utils

Utility functions for MLflow secrets auth providers.

This module centralizes
  • Logger setup with environment-driven log levels.
  • Safe logging with automatic redaction of sensitive substrings.
  • Secret parsing with automatic format detection (JSON vs. plain string).
  • URL allowlist checks.
  • Small helpers (duration formatting, TTL validation, masking).
  • Retry functionality with exponential backoff and jitter.

format_duration(seconds)

Format a duration in seconds into a short human-readable string.

Examples:

45 -> "45s" 125 -> "2m 5s" 3600 -> "1h"

Parameters:

Name Type Description Default
seconds int

Duration in seconds.

required

Returns:

Type Description
str

Short human-readable representation.

Source code in src/mlflow_secrets_auth/utils.py
def format_duration(seconds: int) -> str:
    """Format a duration in seconds into a short human-readable string.

    Examples:
        45 -> "45s"
        125 -> "2m 5s"
        3600 -> "1h"

    Args:
        seconds: Duration in seconds.

    Returns:
        Short human-readable representation.

    """
    if seconds < 60:
        return f"{seconds}s"
    if seconds < 3600:
        m, s = divmod(seconds, 60)
        return f"{m}m" if s == 0 else f"{m}m {s}s"
    h, rem = divmod(seconds, 3600)
    m = rem // 60
    return f"{h}h" if m == 0 else f"{h}h {m}m"

is_host_allowed(url, allowed_hosts)

Return whether the URL's host is in the provided allowlist.

Supports exact hostname matches and wildcard patterns using shell-style globbing (e.g., "*.corp.example.com" matches "api.corp.example.com").

Hostname matching is case-insensitive as per DNS standards.

Examples:

  • "example.com" matches exactly "example.com"
  • "*.corp.example.com" matches "api.corp.example.com", "web.corp.example.com"
  • "mlflow.*.com" matches "mlflow.prod.com", "mlflow.staging.com"

Parameters:

Name Type Description Default
url str

Full URL to check.

required
allowed_hosts list[str] | None

List of allowed hostname patterns, or None to allow all.

required

Returns:

Type Description
bool

True if allowed (or no allowlist configured), otherwise False.

Source code in src/mlflow_secrets_auth/utils.py
def is_host_allowed(url: str, allowed_hosts: list[str] | None) -> bool:
    """Return whether the URL's host is in the provided allowlist.

    Supports exact hostname matches and wildcard patterns using shell-style
    globbing (e.g., "*.corp.example.com" matches "api.corp.example.com").

    Hostname matching is case-insensitive as per DNS standards.

    Examples:
        - "example.com" matches exactly "example.com"
        - "*.corp.example.com" matches "api.corp.example.com", "web.corp.example.com"
        - "mlflow.*.com" matches "mlflow.prod.com", "mlflow.staging.com"

    Args:
        url: Full URL to check.
        allowed_hosts: List of allowed hostname patterns, or None to allow all.

    Returns:
        True if allowed (or no allowlist configured), otherwise False.

    """
    if allowed_hosts is None:
        return True
    try:
        hostname = urlparse(url).hostname
        if not hostname:
            return False

        # Normalize hostname to lowercase for case-insensitive comparison
        hostname = hostname.lower()

        # Check each pattern in the allowlist
        for pattern in allowed_hosts:
            # Normalize pattern to lowercase as well
            pattern_lower = pattern.lower()
            if fnmatch.fnmatch(hostname, pattern_lower):
                return True

        return False
    except Exception:
        return False

mask_secret(secret, show_chars=DEFAULT_SHOW_CHARS)

Mask a secret for safe logging.

For short inputs (<= 2 * show_chars) returns a generic "***" to avoid revealing almost the entire secret.

Parameters:

Name Type Description Default
secret str

Secret value.

required
show_chars int

Number of leading and trailing characters to keep.

DEFAULT_SHOW_CHARS

Returns:

Type Description
str

Masked representation of the secret.

Source code in src/mlflow_secrets_auth/utils.py
def mask_secret(secret: str, show_chars: int = DEFAULT_SHOW_CHARS) -> str:
    """Mask a secret for safe logging.

    For short inputs (<= 2 * show_chars) returns a generic "***" to avoid
    revealing almost the entire secret.

    Args:
        secret: Secret value.
        show_chars: Number of leading and trailing characters to keep.

    Returns:
        Masked representation of the secret.

    """
    if not secret or len(secret) <= show_chars * 2:
        return DEFAULT_MASK_CHAR * 3
    return f"{secret[:show_chars]}...{secret[-show_chars:]}"

parse_secret_json(secret_value)

Parse secret material with automatic format detection.

Accepts either
  • JSON object with one of:
    • {"token": ""}
    • {"username": "...", "password": "..."}
  • Plain string:
    • "username:password" → {"username": "...", "password": "..."}
    • "" → {"token": ""}

Whitespace is stripped from string fields.

Parameters:

Name Type Description Default
secret_value str

Raw secret value.

required

Returns:

Type Description
dict[str, str]

A normalized dict with either {"token": "..."} or {"username": "...", "password": "..."}.

Raises:

Type Description
ValueError

If the JSON object is invalid or missing required fields.

Source code in src/mlflow_secrets_auth/utils.py
def parse_secret_json(secret_value: str) -> dict[str, str]:
    """Parse secret material with automatic format detection.

    Accepts either:
      * JSON object with one of:
          - {"token": "<opaque token>"}
          - {"username": "...", "password": "..."}
      * Plain string:
          - "username:password" → {"username": "...", "password": "..."}
          - "<token>" → {"token": "<token>"}

    Whitespace is stripped from string fields.

    Args:
        secret_value: Raw secret value.

    Returns:
        A normalized dict with either {"token": "..."} or {"username": "...", "password": "..."}.

    Raises:
        ValueError: If the JSON object is invalid or missing required fields.

    """
    # First attempt: JSON object
    try:
        data = json.loads(secret_value)
    except json.JSONDecodeError:
        # Fallback to plain string
        value = secret_value.strip()
        if not value:
            raise ValueError(ERROR_SECRET_EMPTY) from None

        if ":" in value:
            username, password = value.split(":", 1)
            username = (username or "").strip()
            password = (password or "").strip()
            if not username or not username.strip():
                raise ValueError(ERROR_SECRET_USERNAME_INVALID) from None
            if not password or not password.strip():
                raise ValueError(ERROR_SECRET_PASSWORD_INVALID) from None
            return {SECRET_FIELD_USERNAME: username, SECRET_FIELD_PASSWORD: password}
        return {SECRET_FIELD_TOKEN: value}

    if not isinstance(data, dict):
        raise ValueError(ERROR_SECRET_INVALID_JSON)

    # Token-based secret
    if SECRET_FIELD_TOKEN in data:
        token = data[SECRET_FIELD_TOKEN]
        if not isinstance(token, str) or not token.strip():
            raise ValueError(ERROR_SECRET_TOKEN_INVALID)
        return {SECRET_FIELD_TOKEN: token.strip()}

    # Username/password secret
    if SECRET_FIELD_USERNAME in data and SECRET_FIELD_PASSWORD in data:
        username = data[SECRET_FIELD_USERNAME]
        password = data[SECRET_FIELD_PASSWORD]
        if not isinstance(username, str) or not username.strip():
            raise ValueError(ERROR_SECRET_USERNAME_INVALID)
        if not isinstance(password, str) or not password.strip():
            raise ValueError(ERROR_SECRET_PASSWORD_INVALID)
        return {SECRET_FIELD_USERNAME: username.strip(), SECRET_FIELD_PASSWORD: password.strip()}

    raise ValueError(
        ERROR_SECRET_MISSING_FIELDS,
    )

retry_with_jitter(fn, attempts=3, base_delay=0.1, backoff=2.0, max_delay=1.0, jitter=0.4, sleep=time.sleep)

Retry a function with exponential backoff and jitter.

Calls fn up to attempts times with exponential backoff and ±jitter%, capped by max_delay. If all attempts fail, reraises the last exception.

Parameters:

Name Type Description Default
fn Callable[[], T]

Function to call (should take no arguments).

required
attempts int

Maximum number of attempts (must be >= 1).

3
base_delay float

Initial delay in seconds.

0.1
backoff float

Exponential backoff multiplier.

2.0
max_delay float

Maximum delay between attempts in seconds.

1.0
jitter float

Jitter factor as a proportion (e.g., 0.4 = ±40%).

0.4
sleep Callable[[float], None]

Sleep function (mainly for testing).

sleep

Returns:

Type Description
T

Result of the successful function call.

Raises:

Type Description
Exception

The last exception encountered if all attempts fail.

Source code in src/mlflow_secrets_auth/utils.py
def retry_with_jitter(
    fn: Callable[[], T],
    attempts: int = 3,
    base_delay: float = 0.1,
    backoff: float = 2.0,
    max_delay: float = 1.0,
    jitter: float = 0.4,
    sleep: Callable[[float], None] = time.sleep,
) -> T:
    """Retry a function with exponential backoff and jitter.

    Calls `fn` up to `attempts` times with exponential backoff and ±jitter%,
    capped by `max_delay`. If all attempts fail, reraises the last exception.

    Args:
        fn: Function to call (should take no arguments).
        attempts: Maximum number of attempts (must be >= 1).
        base_delay: Initial delay in seconds.
        backoff: Exponential backoff multiplier.
        max_delay: Maximum delay between attempts in seconds.
        jitter: Jitter factor as a proportion (e.g., 0.4 = ±40%).
        sleep: Sleep function (mainly for testing).

    Returns:
        Result of the successful function call.

    Raises:
        Exception: The last exception encountered if all attempts fail.

    """
    last_exception = None

    for attempt in range(attempts):
        try:
            return fn()
        except Exception as e:
            last_exception = e

            # Don't sleep after the last attempt
            if attempt == attempts - 1:
                break

            # Calculate delay with exponential backoff
            delay = min(base_delay * (backoff ** attempt), max_delay)

            # Add jitter: ±jitter% of the delay
            jitter_amount = delay * jitter * (2 * random.random() - 1)  # noqa: S311
            final_delay = max(0, delay + jitter_amount)

            sleep(final_delay)

    # Re-raise the last exception if all attempts failed
    if last_exception is not None:
        raise last_exception

    # This should never happen, but just in case
    msg = "No attempts were made"
    raise RuntimeError(msg)

safe_log(logger, level, message, *args)

Log a message with automatic redaction of sensitive data.

The message is first formatted with args (printf-style) and only then passed through the redactor to avoid leaking secrets via formatting.

Parameters:

Name Type Description Default
logger Logger

Target logger.

required
level int

Logging level (e.g., logging.INFO).

required
message str

Format string.

required
*args Any

Arguments for printf-style substitution.

()
Source code in src/mlflow_secrets_auth/utils.py
def safe_log(logger: logging.Logger, level: int, message: str, *args: Any) -> None:
    """Log a message with automatic redaction of sensitive data.

    The message is first formatted with `args` (printf-style) and only then
    passed through the redactor to avoid leaking secrets via formatting.

    Args:
        logger: Target logger.
        level: Logging level (e.g., `logging.INFO`).
        message: Format string.
        *args: Arguments for printf-style substitution.

    """
    if args:
        try:
            message = message % args
        except Exception:
            # Fall back to a simple join if interpolation fails for any reason
            message = " ".join([message, *map(str, args)])
    redacted_message = redact_sensitive_data(message)
    logger.log(level, redacted_message)

setup_logger(name)

Create or configure a namespaced logger.

The logger level is always driven by the MLFLOW_SECRETS_LOG_LEVEL env var. A single stream handler is attached once; propagation is disabled to avoid duplicated messages under test runners or frameworks.

Parameters:

Name Type Description Default
name str

Logger name (typically package.module).

required

Returns:

Type Description
Logger

A configured logging.Logger instance.

Source code in src/mlflow_secrets_auth/utils.py
def setup_logger(name: str) -> logging.Logger:
    """Create or configure a namespaced logger.

    The logger level is always driven by the `MLFLOW_SECRETS_LOG_LEVEL` env var.
    A single stream handler is attached once; propagation is disabled to avoid
    duplicated messages under test runners or frameworks.

    Args:
        name: Logger name (typically package.module).

    Returns:
        A configured `logging.Logger` instance.

    """
    logger = logging.getLogger(name)

    # Always set the level from config
    level_name = get_log_level()
    level = getattr(logging, level_name, logging.INFO)
    logger.setLevel(level)

    # Only add a handler if none exist (avoid duplicate logs under pytest)
    if not logger.handlers:
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)

    # Avoid double logging through parent loggers
    logger.propagate = False
    return logger

validate_ttl(ttl_seconds, *, default=DEFAULT_TTL_SECONDS, min_ttl=MIN_TTL_SECONDS, max_ttl=MAX_TTL_SECONDS)

Validate and clamp a TTL value.

Rules
  • If ttl_seconds is None or <= 0, use default.
  • Clamp the final value between min_ttl and max_ttl (inclusive).

Parameters:

Name Type Description Default
ttl_seconds int | None

Requested TTL in seconds.

required
default int

Fallback TTL when input is invalid or not provided.

DEFAULT_TTL_SECONDS
min_ttl int

Minimum allowed TTL (inclusive).

MIN_TTL_SECONDS
max_ttl int

Maximum allowed TTL (inclusive).

MAX_TTL_SECONDS

Returns:

Type Description
int

A valid TTL in seconds.

Source code in src/mlflow_secrets_auth/utils.py
def validate_ttl(
    ttl_seconds: int | None,
    *,
    default: int = DEFAULT_TTL_SECONDS,
    min_ttl: int = MIN_TTL_SECONDS,
    max_ttl: int = MAX_TTL_SECONDS,
) -> int:
    """Validate and clamp a TTL value.

    Rules:
      * If `ttl_seconds` is None or <= 0, use `default`.
      * Clamp the final value between `min_ttl` and `max_ttl` (inclusive).

    Args:
        ttl_seconds: Requested TTL in seconds.
        default: Fallback TTL when input is invalid or not provided.
        min_ttl: Minimum allowed TTL (inclusive).
        max_ttl: Maximum allowed TTL (inclusive).

    Returns:
        A valid TTL in seconds.

    """
    try:
        ttl = int(ttl_seconds) if ttl_seconds is not None else int(default)
    except (TypeError, ValueError):
        ttl = int(default)

    if ttl <= 0:
        ttl = int(default)

    if ttl < min_ttl:
        ttl = min_ttl
    elif ttl > max_ttl:
        ttl = max_ttl

    return ttl