langchain4j [特性] 使用StreamingChatLanguageModel/TokenStream时取消流式传输的方法

r7knjye2  于 4个月前  发布在  其他
关注(0)|答案(4)|浏览(51)

我想取消使用StreamingChatLanguageModel.generate(...),但是这个方法返回的是void。也许可以返回ResponseHandle。

描述bug

日志和堆栈跟踪

重现

预期行为

请填写以下信息:

  • LangChain4j版本:
  • 使用的LLM(s):
  • Java版本:
  • Spring Boot版本(如果适用):

附加上下文

lqfhib0f

lqfhib0f1#

要解决在使用 StreamingChatLanguageModel.generate(...) 时需要取消操作的需求,您可以修改 CollectorResponseHandler 以包含一个用于取消的机制。由于当前的方法返回 void,您可以在处理器中引入一个标志或方法来发出取消信号。
以下是如何实现此功能的示例:

  1. CollectorResponseHandler 中添加一个取消标志。
  2. onNextonComplete 方法中检查此标志,如果请求取消,则停止处理。
public static final class CollectorResponseHandler<T> implements StreamingResponseHandler<T> {
    private final List<Response<T>> responses = new ArrayList<>();
    private volatile boolean cancelled = false;

    public List<Response<T>> responses() {
        return responses;
    }

    public void cancel() {
        this.cancelled = true;
    }

    @Override
    public void onNext(String token) {
        if (cancelled) {
            return;
        }
        // Handle the token
    }

    @Override
    public void onError(Throwable error) {
        if (cancelled) {
            return;
        }
        // Handle the error
    }

    @Override
    public void onComplete(Response<T> response) {
        if (cancelled) {
            return;
        }
        responses.add(response);
    }
}

要使用此取消机制,您可以在希望停止操作时调用处理器上的 cancel 方法:

CollectorResponseHandler<AiMessage> handler = new CollectorResponseHandler<>();
// Start the generation process
model.generate(messages, handler);

// At some point later, if you need to cancel
handler.cancel();

这样,generate 方法将尊重取消请求并停止进一步处理令牌或响应。
如果您需要进一步定制或有特定要求,请提供有关您的设置的详细信息,包括 LangChain4j 版本、LLM(s)使用的 Java 版本和 Spring Boot 版本(如适用)。

详细信息

来源

此响应旨在帮助您节省时间并提高效率。它不是 精确解决方案,而是您自己研究的起点。
帮助我变得更有用!如果这有帮助,请留下一个 👍,如果无关紧要,请留下一个 👎。

  • 要继续对话,请提及 @dosu。*
2vuwiymt

2vuwiymt2#

我使用AiServices创建了一个助手流

interface Assistant {
      TokenStream chat(String userMessage);
  }

.... 
 Assistant assistant = AiServices.builder(AssistantAgent.class)
                    .streamingChatLanguageModel(chatStreamModel)
                    .chatMemory(memory)
                    .build();

 assistant.chat("What is the capital of Germany?")
    .onNext(new Consumer<String>() {
        @Override
        public void accept(String s) {
            // how to cancel here
        }
    })
    .onComplete(response -> {
        futureAnswer.complete(answerBuilder.toString());
        futureResponse.complete(response);
    })
    .onError(futureAnswer::completeExceptionally)
    .start();

AiServiceTokenStream无法取消!

5vf7fwbs

5vf7fwbs3#

要使onNext方法内的流响应能够被取消,你可以在AiServiceStreamingResponseHandler类中引入一个标志,该标志表示是否应该取消流。然后,你可以在onNext方法中检查这个标志,如果设置了该标志,则停止处理。
以下是如何修改AiServiceStreamingResponseHandler类以包含此功能的方法:

  1. 修改AiServiceStreamingResponseHandler以支持取消:
class AiServiceStreamingResponseHandler implements StreamingResponseHandler<AiMessage> {

    private final Logger log = LoggerFactory.getLogger(AiServiceStreamingResponseHandler.class);

    private final AiServiceContext context;
    private final Object memoryId;

    private final Consumer<String> tokenHandler;
    private final Consumer<Response<AiMessage>> completionHandler;
    private final Consumer<Throwable> errorHandler;

    private final TokenUsage tokenUsage;

    private volatile boolean cancelRequested = false;

    AiServiceStreamingResponseHandler(AiServiceContext context,
                                      Object memoryId,
                                      Consumer<String> tokenHandler,
                                      Consumer<Response<AiMessage>> completionHandler,
                                      Consumer<Throwable> errorHandler,
                                      TokenUsage tokenUsage) {
        this.context = ensureNotNull(context, "context");
        this.memoryId = ensureNotNull(memoryId, "memoryId");

        this.tokenHandler = ensureNotNull(tokenHandler, "tokenHandler");
        this.completionHandler = completionHandler;
        this.errorHandler = errorHandler;

        this.tokenUsage = ensureNotNull(tokenUsage, "tokenUsage");
    }

    public void requestCancel() {
        this.cancelRequested = true;
    }

    @Override
    public void onNext(String token) {
        if (cancelRequested) {
            return;
        }
        tokenHandler.accept(token);
    }

    @Override
    public void onComplete(Response<AiMessage> response) {
        if (cancelRequested) {
            return;
        }

        AiMessage aiMessage = response.content();

        if (context.hasChatMemory()) {
            context.chatMemory(memoryId).add(aiMessage);
        }

        if (aiMessage.hasToolExecutionRequests()) {
            for (ToolExecutionRequest toolExecutionRequest : aiMessage.toolExecutionRequests()) {
                ToolExecutor toolExecutor = context.toolExecutors.get(toolExecutionRequest.name());
                String toolExecutionResult = toolExecutor.execute(toolExecutionRequest, memoryId);
                ToolExecutionResultMessage toolExecutionResultMessage = ToolExecutionResultMessage.from(
                        toolExecutionRequest,
                        toolExecutionResult
                );
                context.chatMemory(memoryId).add(toolExecutionResultMessage);
            }

            context.streamingChatModel.generate(
                    context.chatMemory(memoryId).messages(),
                    context.toolSpecifications,
                    new AiServiceStreamingResponseHandler(
                            context,
                            memoryId,
                            tokenHandler,
                            completionHandler,
                            errorHandler,
                            TokenUsage.sum(tokenUsage, response.tokenUsage())
                    )
            );
        } else {
            if (completionHandler != null) {
                completionHandler.accept(Response.from(
                        aiMessage,
                        TokenUsage.sum(tokenUsage, response.tokenUsage()),
                        response.finishReason())
                );
            }
        }
    }

    @Override
    public void onError(Throwable error) {
        if (errorHandler != null) {
            try {
                errorHandler.accept(error);
            } catch (Exception e) {
                log.error("While handling the following error...", error);
                log.error("...the following error happened", e);
            }
        } else {
            log.warn("Ignored error", error);
        }
    }
}
  1. 更新TokenStream接口及其实现以支持取消:
    你需要确保TokenStream接口及其实现能够处理取消请求。以下是修改TokenStream接口及其实现的示例:
interface TokenStream {
    TokenStream onNext(Consumer<String> tokenHandler);
    TokenStream onComplete(Consumer<Response<AiMessage>> completionHandler);
    TokenStream onError(Consumer<Throwable> errorHandler);
    void start();
    void cancel(); // Add this method to support cancellation
}

class AiServiceTokenStream implements TokenStream {

    private final AiServiceContext context;
    private final Object memoryId;
    private Consumer<String> tokenHandler;
    private Consumer<Response<AiMessage>> completionHandler;
    private Consumer<Throwable> errorHandler;
    private AiServiceStreamingResponseHandler handler;

    AiServiceTokenStream(AiServiceContext context, Object memoryId) {
        this.context = context;
        this.memoryId = memoryId;
    }

    @Override
    public TokenStream onNext(Consumer<String> tokenHandler) {
        this.tokenHandler = tokenHandler;
        return this;
    }

    @Override
    public TokenStream onComplete(Consumer<Response<AiMessage>> completionHandler) {
        this.completionHandler = completionHandler;
        return this;
    }

    @Override
    public TokenStream onError(Consumer<Throwable> errorHandler) {
        this.errorHandler = errorHandler;
        return this;
    }

    @Override
    public void start() {
        handler = new AiServiceStreamingResponseHandler(
                context,
                memoryId,
                tokenHandler,
                completionHandler,
                errorHandler,
                new TokenUsage()
        );

        if (context.streamingChatModel != null) {
            context.streamingChatModel.generate(messagesToSend, handler);
        } else {
            context.toolSpecifications.generate(messagesToSend, handler);
        }
    }

    @Override
    public void cancel() {
        if (handler != null) {
            handler.requestCancel();
        }
    }
}
  1. 在你Assistant实现中使用cancel方法:
    现在,你可以在你Assistant实现中使用cancel方法来取消onNext方法内的流响应。
interface Assistant {
    TokenStream chat(String userMessage);
}

assistant.chat("What is the capital of Germany?")
    .onNext(new Consumer<String>() {
        @Override
        public void accept(String s) {
            // Cancel the operation here
            tokenStream.cancel();
        }
    })
    .onComplete(response -> {
        futureAnswer.complete(answerBuilder.toString());
        futureResponse.complete(response);
    })
    .onError(futureAnswer::completeExceptionally)
    .start();

通过这些修改,你现在可以通过在TokenStream示例上调用cancel方法来取消onNext方法内的流响应。

详情

来源

这个响应旨在提供帮助并节省你的时间。它不是精确解决方案,而是你自己的研究的一个起点。
帮助我变得更有用!如果这有帮助请留下👍,如果无关请留下👎。

  • 要继续对话,请提及@dosu。*
z9gpfhce

z9gpfhce4#

上述方法并没有真正终止与大型模型的流式请求;它只是忽略了它。随着大型模型的响应继续到来,它们将不断进入onNext方法并返回,直到所有输出完成。

相关问题