llama_index [问题]:llamaindex基岩,总是出现"任务异常从未被检索",

mxg2im7a  于 4个月前  发布在  其他
关注(0)|答案(6)|浏览(56)

问题验证

  • 我已在文档和discord中寻找答案。

问题

你好,我正在使用llama-index-bedrock-converse@0.2.2,在CBEventType.LLM上,总是得到NotImplementedError('Use async-for instead,我不确定问题来自哪里,它可能来自这里

event_handler = EventCallbackHandler()
    chat_engine.callback_manager.handlers.append(event_handler)  # type: ignore
    response = await chat_engine.astream_chat(last_message_content, messages)

    async def content_generator():
        # Yield the text response
        async def _text_generator():
            async for token in response.async_response_gen():
                yield VercelStreamResponse.convert_text(token)
            # the text_generator is the leading stream, once it's finished, also finish the event stream
            event_handler.is_done = True

        # Yield the events from the event handler
        async def _event_generator():
            async for event in event_handler.async_event_gen():
                event_response = event.to_response()
                if event_response is not None:
                    yield VercelStreamResponse.convert_data(event_response)

        combine = stream.merge(_text_generator(), _event_generator())

你能帮我指出我缺少的配置在哪里吗?我一直陷入这个问题。谢谢


#### 问题验证

* 我已在文档和discord中寻找答案。

#### 问题

你好,我正在使用llama-index-bedrock-converse@0.2.2,在CBEventType.LLM上,总是得到`NotImplementedError('Use async-for instead`,我不确定问题来自哪里,它可能来自这里

event_handler = EventCallbackHandler()
chat_engine.callback_manager.handlers.append(event_handler) # type: ignore
response = await chat_engine.astream_chat(last_message_content, messages)

async def content_generator():
    # Yield the text response
    async def _text_generator():
        async for token in response.async_response_gen():
            yield VercelStreamResponse.convert_text(token)
        # the text_generator is the leading stream, once it's finished, also finish the event stream
        event_handler.is_done = True

    # Yield the events from the event handler
    async def _event_generator():
        async for event in event_handler.async_event_gen():
            event_response = event.to_response()
            if event_response is not None:
                yield VercelStreamResponse.convert_data(event_response)

    combine = stream.merge(_text_generator(), _event_generator())

你能帮我指出我缺少的配置在哪里吗?我一直陷入这个问题。谢谢
ca1c2owp

ca1c2owp2#

我的基石配置:

def init_bedrock():
    from llama_index.llms.bedrock_converse import BedrockConverse
    from llama_index.embeddings.huggingface import HuggingFaceEmbedding

    model_map: Dict[str, str] = {"llama3-70b": "meta.llama3-70b-instruct-v1:0"}

    Settings.llm = BedrockConverse(model=model_map[os.getenv("MODEL")])

    embed_model_map: Dict[str, str] = {
        "all-MiniLM-L6-v2": "sentence-transformers/all-MiniLM-L6-v2",
        "all-mpnet-base-v2": "sentence-transformers/all-mpnet-base-v2",
    }
    Settings.embed_model = HuggingFaceEmbedding(
        model_name=embed_model_map[os.getenv("EMBEDDING_MODEL")]
    )
daolsyd0

daolsyd03#

嘿,@gilangdipper!我在这里帮助你解决任何bug、问题或贡献。只是在等待人类维护者吗?在此期间,我会协助你。
要解决在使用 CBEventType.LLM 时的 llama-index-bedrock-converse@0.2.2 中的 NotImplementedError('Use async-for instead') 错误,你需要确保事件处理是异步进行的。提供的代码目前不支持异步事件处理,这很可能是导致错误的原因。
以下是更新后的 SimpleLLMHandler 类,其中包含异步方法来处理事件:

import logging
from typing import Any, Dict, List, Optional, cast

from llama_index.core.callbacks.pythonically_printing_base_handler import (
    PythonicallyPrintingBaseHandler,
)
from llama_index.core.callbacks.schema import CBEventType, EventPayload

class SimpleLLMHandler(PythonicallyPrintingBaseHandler):
    """Callback handler for printing llms inputs/outputs."""

    def __init__(self, logger: Optional[logging.Logger] = None) -> None:
        super().__init__(
            event_starts_to_ignore=[], event_ends_to_ignore=[], logger=logger
        )

    async def start_trace(self, trace_id: Optional[str] = None) -> None:
        return

    async def end_trace(
        self,
        trace_id: Optional[str] = None,
        trace_map: Optional[Dict[str, List[str]]] = None,
    ) -> None:
        return

    async def _print_llm_event(self, payload: dict) -> None:
        from llama_index.core.llms import ChatMessage

        if EventPayload.PROMPT in payload:
            prompt = str(payload.get(EventPayload.PROMPT))
            completion = str(payload.get(EventPayload.COMPLETION))

            self._print(f"** Prompt: **\n{prompt}")
            self._print("*" * 50)
            self._print(f"** Completion: **\n{completion}")
            self._print("*" * 50)
            self._print("\n")
        elif EventPayload.MESSAGES in payload:
            messages = cast(List[ChatMessage], payload.get(EventPayload.MESSAGES, []))
            messages_str = "\n".join([str(x) for x in messages])
            response = str(payload.get(EventPayload.RESPONSE))

            self._print(f"** Messages: **\n{messages_str}")
            self._print("*" * 50)
            self._print(f"** Response: **\n{response}")
            self._print("*" * 50)
            self._print("\n")

    async def on_event_start(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        parent_id: str = "",
        **kwargs: Any,
    ) -> str:
        return event_id

    async def on_event_end(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        **kwargs: Any,
    ) -> None:
        """Count the LLM or Embedding tokens as needed."""
        if event_type == CBEventType.LLM and payload is not None:
            await self._print_llm_event(payload)

此代码更新了 SimpleLLMHandler 类,使其使用异步方法( async def )处理事件,这应该解决了 NotImplementedError('Use async-for instead') 错误。此外,在实施异步操作之前,请确保你的 LLM 支持异步操作。

  • 要继续与 @dosu 交谈,请提及 @dosu。*
qf9go6mv

qf9go6mv4#

你好,@dosu ,你能帮我检查一下我的代码吗?

import json
import asyncio
from typing import AsyncGenerator, Dict, Any, List, Optional
from llama_index.core.callbacks.base import BaseCallbackHandler
from llama_index.core.callbacks.schema import CBEventType
from llama_index.core.tools.types import ToolOutput
from pydantic import BaseModel

class CallbackEvent(BaseModel):
    event_type: CBEventType
    payload: Optional[Dict[str, Any]] = None
    event_id: str = ""

    def get_retrieval_message(self) -> dict | None:
        if self.payload:
            nodes = self.payload.get("nodes")
            if nodes:
                msg = f"Retrieved {len(nodes)} sources to use as context for the query"
            else:
                msg = f"Retrieving context for query: '{self.payload.get('query_str')}'"
            return {
                "type": "events",
                "data": {"title": msg},
            }
        else:
            return None

    def get_tool_message(self) -> dict | None:
        func_call_args = self.payload.get("function_call")
        if func_call_args is not None and "tool" in self.payload:
            tool = self.payload.get("tool")
            return {
                "type": "events",
                "data": {
                    "title": f"Calling tool: {tool.name} with inputs: {func_call_args}",
                },
            }

    def _is_output_serializable(self, output: Any) -> bool:
        try:
            json.dumps(output)
            return True
        except TypeError:
            return False

    def get_agent_tool_response(self) -> dict | None:
        response = self.payload.get("response")
        if response is not None:
            sources = response.sources
            for source in sources:
                # Return the tool response here to include the toolCall information
                if isinstance(source, ToolOutput):
                    if self._is_output_serializable(source.raw_output):
                        output = source.raw_output
                    else:
                        output = source.content

                    return {
                        "type": "tools",
                        "data": {
                            "toolOutput": {
                                "output": output,
                                "isError": source.is_error,
                            },
                            "toolCall": {
                                "id": None,  # There is no tool id in the ToolOutput
                                "name": source.tool_name,
                                "input": source.raw_input,
                            },
                        },
                    }

    def to_response(self):
        match self.event_type:
            case "retrieve":
                return self.get_retrieval_message()
            case "function_call":
                return self.get_tool_message()
            case "agent_step":
                return self.get_agent_tool_response()
            case _:
                return None

class EventCallbackHandler(BaseCallbackHandler):
    _aqueue: asyncio.Queue
    is_done: bool = False

    def __init__(
        self,
    ):
        """Initialize the base callback handler."""
        ignored_events = [
            # CBEventType.CHUNKING,
            # CBEventType.NODE_PARSING,
            # CBEventType.EMBEDDING,
            # CBEventType.LLM,
            # CBEventType.TEMPLATING,
        ]
        super().__init__(ignored_events, ignored_events)
        self._aqueue = asyncio.Queue()

    def on_event_start(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        **kwargs: Any,
    ) -> str:
        event = CallbackEvent(event_id=event_id, event_type=event_type, payload=payload)
        if event.to_response() is not None:
            self._aqueue.put_nowait(event)

    def on_event_end(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        **kwargs: Any,
    ) -> None:
        event = CallbackEvent(event_id=event_id, event_type=event_type, payload=payload)
        print(event)

        if event.to_response() is not None:
            self._aqueue.put_nowait(event)

    def start_trace(self, trace_id: Optional[str] = None) -> None:
        """No-op."""

    def end_trace(
        self,
        trace_id: Optional[str] = None,
        trace_map: Optional[Dict[str, List[str]]] = None,
    ) -> None:
        """No-op."""

    async def async_event_gen(self) -> AsyncGenerator[CallbackEvent, None]:
        while not self._aqueue.empty() or not self.is_done:
            try:
                yield await asyncio.wait_for(self._aqueue.get(), timeout=0.1)
            except asyncio.TimeoutError:
                pass
68de4m5k

68de4m5k5#

你的llama_index处理回调事件的代码看起来基本正确,但还有一些改进和潜在问题需要解决:

  1. is_done的初始化:确保在构造函数中正确初始化is_done
  2. 处理None有效载荷:添加检查以处理payload可能是None的情况。
  3. 正确关闭生成器:确保当is_done设置为True时,async_event_gen中的生成器被正确关闭。
    以下是经过这些改进的你的代码修订版本:
import json
import asyncio
from typing import AsyncGenerator, Dict, Any, List, Optional
from llama_index.core.callbacks.base import BaseCallbackHandler
from llama_index.core.callbacks.schema import CBEventType
from llama_index.core.tools.types import ToolOutput
from pydantic import BaseModel

class CallbackEvent(BaseModel):
    event_type: CBEventType
    payload: Optional[Dict[str, Any]] = None
    event_id: str = ""

    def get_retrieval_message(self) -> dict | None:
        if self.payload:
            nodes = self.payload.get("nodes")
            if nodes:
                msg = f"Retrieved {len(nodes)} sources to use as context for the query"
            else:
                msg = f"Retrieving context for query: '{self.payload.get('query_str')}'"
            return {
                "type": "events",
                "data": {"title": msg},
            }
        return None

    def get_tool_message(self) -> dict | None:
        if self.payload:
            func_call_args = self.payload.get("function_call")
            if func_call_args is not None and "tool" in self.payload:
                tool = self.payload.get("tool")
                return {
                    "type": "events",
                    "data": {
                        "title": f"Calling tool: {tool.name} with inputs: {func_call_args}",
                    },
                }
        return None

    def _is_output_serializable(self, output: Any) -> bool:
        try:
            json.dumps(output)
            return True
        except TypeError:
            return False

    def get_agent_tool_response(self) -> dict | None:
        if self.payload:
            response = self.payload.get("response")
            if response is not None:
                sources = response.sources
                for source in sources:
                    if isinstance(source, ToolOutput):
                        output = source.raw_output if self._is_output_serializable(source.raw_output) else source.content
                        return {
                            "type": "tools",
                            "data": {
                                "toolOutput": {
                                    "output": output,
                                    "isError": source.is_error,
                                },
                                "toolCall": {
                                    "id": None,
                                    "name": source.tool_name,
                                    "input": source.raw_input,
                                },
                            },
                        }
        return None

    def to_response(self):
        match self.event_type:
            case "retrieve":
                return self.get_retrieval_message()
            case "function_call":
                return self.get_tool_message()
            case "agent_step":
                return self.get_agent_tool_response()
            case _:
                return None

class EventCallbackHandler(BaseCallbackHandler):
    def __init__(self):
        """Initialize the base callback handler."""
        ignored_events = [
            # CBEventType.CHUNKING,
            # CBEventType.NODE_PARSING,
            # CBEventType.EMBEDDING,
            # CBEventType.LLM,
            # CBEventType.TEMPLATING,
        ]
        super().__init__(ignored_events, ignored_events)
        self._aqueue = asyncio.Queue()
        self.is_done = False

    def on_event_start(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        **kwargs: Any,
    ) -> str:
        event = CallbackEvent(event_id=event_id, event_type=event_type, payload=payload)
        if event.to_response() is not None:
            self._aqueue.put_nowait(event)
        return event_id

    def on_event_end(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        **kwargs: Any,
    ) -> None:
        event = CallbackEvent(event_id=event_id, event_type=event_type, payload=payload)
        print(event)
        if event.to_response() is not None:
            self._aqueue.put_nowait(event)

    def start_trace(self, trace_id: Optional[str] = None) -> None:
        """No-op."""

    def end_trace(
        self,
        trace_id: Optional[str] = None,
        trace_map: Optional[Dict[str, List[str]]] = None,
    ) -> None:
        """No-op."""

    async def async_event_gen(self) -> AsyncGenerator[CallbackEvent, None]:
        while not self._aqueue.empty() or not self.is_done:
            try:
                yield await asyncio.wait_for(self._aqueue.get(), timeout=0.1)
            except asyncio.TimeoutError:
                pass

这应该解决了与初始化、处理None有效载荷以及正确关闭生成器相关的问题。

  • 要继续与Dosu交谈,请提及@dosu。*
83qze16e

83qze16e6#

@dosu,我们是否可以改进与此问题相关的一些方面?using llama-index-bedrock-converse@0.2.2, on CBEventType.LLM, always got NotImplementedError('Use async-for instead, I'm not sure the issue coming from where, it could be from here

相关问题