langflow API流泄漏

tf7tbtn2  于 4个月前  发布在  其他
关注(0)|答案(1)|浏览(39)

描述bug

当使用API流式传输聊天输出时,流URL端点是开放的。这可能导致如果两个设备调用相同的端点,消息泄露的可能性。

重现问题

重现行为所需的步骤:

  1. 使用playground的基本流程进行调用
  2. 转到任何浏览器或设备并访问:http://<YOUR_ADDR>/api/v1/build/<YOUR_FLOW_ID>/<CHAT_OUTPUT_ID>/stream。
    示例:( https://langflow.brasil.com/api/v1/build/90deb8be-4e8c-422a-b35f-45042a2c49c9/ChatOutput-EzD73/stream )
  3. 两个设备都可以访问相同的输出,提供的URL上没有身份验证或随机性。

附加上下文

问题在于,当前的langflow实现使得使用API和流式传输多用户应用程序变得不可能。

cs7cruho

cs7cruho1#

你好,@rafael-junio!我在这里帮助你解决任何bug、问题或贡献。让我们一起解决这个问题!
为了保护API流端点并防止两个设备调用相同端点时的未经授权访问和消息泄露,你可以实施以下措施:

  1. 身份验证:确保API需要身份验证,使用API密钥或其他身份验证机制。Langflow支持API密钥功能,可以将API密钥包含在请求头或查询参数中以对API调用进行身份验证。
  2. 会话管理:为每个设备使用唯一的会话令牌,确保每个设备都有自己的会话。可以通过在启动会话时生成唯一的会话ID来实现这一点。
  3. 安全通信:确保与API的所有通信都通过HTTPS进行,以防止窃听。
  4. 令牌更新:实现令牌更新以处理过期的令牌并维护安全会话。
    这是提供了这些安全措施的更新后的代码:
import axios, { AxiosError, AxiosInstance } from "axios";
import { Cookies } from "react-cookie";

class LangflowClient {
    constructor(baseURL, apiKey) {
        this.baseURL = baseURL;
        this.apiKey = apiKey;
        this.api = axios.create({ baseURL: this.baseURL });
        this.cookies = new Cookies();

        // Add request interceptor to include the access token
        this.api.interceptors.request.use(
            (config) => {
                const accessToken = this.cookies.get("access_token_lf");
                if (accessToken) {
                    config.headers["Authorization"] = `Bearer ${accessToken}`;
                }
                if (this.apiKey) {
                    config.headers["x-api-key"] = this.apiKey;
                }
                return config;
            },
            (error) => Promise.reject(error)
        );

        // Add response interceptor to handle token renewal
        this.api.interceptors.response.use(
            (response) => response,
            async (error) => {
                if (error.response?.status === 401 || error.response?.status === 403) {
                    const newToken = await this.renewAccessToken();
                    if (newToken) {
                        error.config.headers["Authorization"] = `Bearer ${newToken}`;
                        return axios.request(error.config);
                    }
                }
                return Promise.reject(error);
            }
        );
    }

    async renewAccessToken() {
        try {
            const response = await axios.post(`${this.baseURL}/renew-token`, {
                refreshToken: this.cookies.get("refresh_token_lf"),
            });
            const { accessToken } = response.data;
            this.cookies.set("access_token_lf", accessToken);
            return accessToken;
        } catch (error) {
            console.error("Token renewal failed:", error);
            return null;
        }
    }

    async post(endpoint, body, headers = {"Content-Type": "application/json"}) {
        const url = `${this.baseURL}${endpoint}`;
        try {
            const response = await this.api.post(url, body, { headers });
            return response.data;
        } catch (error) {
            console.error('Request Error:', error);
            throw error;
        }
    }

    async initiateSession(flowId, inputValue, stream = false, tweaks = {}) {
        const sessionId = this.generateSessionId();
        const endpoint = `/api/v1/run/${flowId}?stream=${stream}&session_id=${sessionId}`;
        return this.post(endpoint, { input_value: inputValue, tweaks: tweaks });
    }

    generateSessionId() {
        return 'xxxx-xxxx-xxxx-xxxx'.replace(/[x]/g, () => {
            return (Math.random() * 16 | 0).toString(16);
        });
    }

    handleStream(streamUrl, onUpdate, onClose, onError) {
        const eventSource = new EventSource(streamUrl);

        eventSource.onmessage = event => {
            const data = JSON.parse(event.data);
            onUpdate(data);
        };

        eventSource.onerror = event => {
            console.error('Stream Error:', event);
            onError(event);
            eventSource.close();
        };

        eventSource.addEventListener("close", () => {
            onClose('Stream closed');
            eventSource.close();
        });

        return eventSource;
    }

    async runFlow(flowIdOrName, inputValue, tweaks, stream = false, onUpdate, onClose, onError) {
        try {
            const initResponse = await this.initiateSession(flowIdOrName, inputValue, stream, tweaks);
            console.log('Init Response:', initResponse);
            if (stream && initResponse && initResponse.outputs && initResponse.outputs[0].outputs[0].artifacts.stream_url) {
                const streamUrl = initResponse.outputs[0].outputs[0].artifacts.stream_url;
                console.log(`Streaming from: ${streamUrl}`);
                this.handleStream(streamUrl, onUpdate, onClose, onError);
            }
            return initResponse;
        } catch (error) {
            console.error('Error running flow:', error);
            onError('Error initiating session');
        }
    }
}

async function main() {
    const flowIdOrName = '${endpointName || flowId}';
    const inputValue = 'User message';
    const stream = false;
    const langflowClient = new LangflowClient('${window.location.protocol}//${window.location.host}',
        ${isAuth ? "'your-api-key'" : "null"});
    const tweaks = ${tweaksString};
    response = await langflowClient.runFlow(
        flowIdOrName,
        inputValue,
        tweaks,
        stream,
        (data) => console.log("Received:", data.chunk), // onUpdate
        (message) => console.log("Stream Closed:", message), // onClose
        (error) => console.log("Stream Error:", error) // onError
    );
    if (!stream) {
        const flowOutputs = response.outputs[0];
        const firstComponentOutputs = flowOutputs.outputs[0];
        const output = firstComponentOutputs.outputs.message;

        console.log("Final Output:", output.message.text);
    }
}

main();

在这个更新后的代码中:

  • 使用 generateSessionId 方法为每个会话生成唯一的会话ID。
  • 将会话ID包含在API请求中,以确保每个设备都有自己的会话。
  • 如果提供了API密钥,则使用 Authorization 头和 x-api-key 头来保护API端点。
  • 实现令牌更新以处理过期的令牌并维护安全会话。

此外,ApiInterceptor 来自 langflow 项目确保将访问令牌添加到请求中,并处理令牌更新和错误管理 1(https://github.com/langflow-ai/langflow/blob/dev/src/frontend/src/modals/apiModal/utils/get-js-api-code.tsx)2(https://github.com/langflow-ai/langflow/blob/dev/src/frontend/src/controllers/API/api.tsx)3(https://github.com/logspace-ai/langflow/blob/dev/src/frontend/src/controllers/API/api.tsx)。

  • 要继续与 Dosu 交谈,请提及 @dosu。*

相关问题