llama_index [Bug]:无法在QueryFusionRetriever中使用qdrant db

8ulbf1ek  于 3个月前  发布在  其他
关注(0)|答案(4)|浏览(46)

问题描述

这是我的代码,

from llama_index.core import VectorStoreIndex, StorageContext
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.vector_stores.qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
# import nest_asyncio

# nest_asyncio.apply()

qdrant_client = QdrantClient(
    path = ""
)
docstore = SimpleDocumentStore()
docstore.add_documents(nodes)

vector_store = QdrantVectorStore(client=qdrant_client, collection_name="_v1")

storage_context = StorageContext.from_defaults(
    docstore=docstore, vector_store=vector_store
)

index = VectorStoreIndex(nodes=nodes, storage_context=storage_context)

from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.core.retrievers import QueryFusionRetriever

dense_retriever = VectorIndexRetriever(
    index=index,
    similarity_top_k=5
)

sparse_retriever = BM25Retriever.from_defaults(docstore=index.docstore, similarity_top_k=5)

retriever = QueryFusionRetriever(
    [
        dense_retriever,
        sparse_retriever,
    ],
    num_queries=1,
    use_async=True,
    retriever_weights=[0.5, 0.5],
    similarity_top_k=5,
    mode="relative_score",
    verbose=True,
)
import nest_asyncio
nest_asyncio.apply()
from llama_index.core.postprocessor import SentenceTransformerRerank
from llama_index.core.response.notebook_utils import display_source_node

reranker = SentenceTransformerRerank(model="cross-encoder/ms-marco-MiniLM-L-12-v2", top_n=5)

nodes = retriever.retrieve("bail")
nodes = reranker.postprocess_nodes(nodes)
for node in nodes:
    print(node.metadata['file_name'])
    #print("---")
    display_source_node(node, source_length=5000)

请注意, dense_retriever 在没有 QueryFusionRetriever 的情况下运行得很好。而且整个代码对于 chromadb 也是有效的。

版本

0.11.1

重现步骤

我已经提供了相应的代码。

相关日志/回溯

AttributeError                            Traceback (most recent call last)
Cell In[18], line 8
      4 from llama_index.core.response.notebook_utils import display_source_node
      6 reranker = SentenceTransformerRerank(model="cross-encoder/ms-marco-MiniLM-L-12-v2", top_n=5)
----> 8 nodes = retriever.retrieve("bail")
      9 nodes = reranker.postprocess_nodes(nodes)
     10 for node in nodes:

File /opt/conda/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py:261, in Dispatcher.span.<locals>.wrapper(func, instance, args, kwargs)
    253 self.span_enter(
    254     id_=id_,
    255     bound_args=bound_args,
   (...)
    258     tags=tags,
    259 )
    260 try:
--> 261     result = func(*args, **kwargs)
    262 except BaseException as e:
    263     self.event(SpanDropEvent(span_id=id_, err_str=str(e)))

File /opt/conda/lib/python3.10/site-packages/llama_index/core/base/base_retriever.py:245, in BaseRetriever.retrieve(self, str_or_query_bundle)
    240 with self.callback_manager.as_trace("query"):
    241     with self.callback_manager.event(
    242         CBEventType.RETRIEVE,
    243         payload={EventPayload.QUERY_STR: query_bundle.query_str},
    244     ) as retrieve_event:
--> 245         nodes = self._retrieve(query_bundle)
    246         nodes = self._handle_recursive_retrieval(query_bundle, nodes)
    247         retrieve_event.on_end(
    248             payload={EventPayload.NODES: nodes},
    249         )

File /opt/conda/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py:261, in Dispatcher.span.<locals>.wrapper(func, instance, args, kwargs)
    253 self.span_enter(
    254     id_=id_,
    255     bound_args=bound_args,
   (...)
    258     tags=tags,
    259 )
    260 try:
--> 261     result = func(*args, **kwargs)
    262 except BaseException as e:
    263     self.event(SpanDropEvent(span_id=id_, err_str=str(e)))

File /opt/conda/lib/python3.10/site-packages/llama_index/core/retrievers/fusion_retriever.py:261, in QueryFusionRetriever._retrieve(self, query_bundle)
    258     queries.extend(self._get_queries(query_bundle.query_str))
    260 if self.use_async:
--> 261     results = self._run_nested_async_queries(queries)
    262 else:
    263     results = self._run_sync_queries(queries)

File /opt/conda/lib/python3.10/site-packages/llama_index/core/retrievers/fusion_retriever.py:220, in QueryFusionRetriever._run_nested_async_queries(self, queries)
    217         tasks.append(retriever.aretrieve(query))
    218         task_queries.append((query.query_str, i))
--> 220 task_results = run_async_tasks(tasks)
    222 results = {}
    223 for query_tuple, query_result in zip(task_queries, task_results):

File /opt/conda/lib/python3.10/site-packages/llama_index/core/async_utils.py:77, in run_async_tasks(tasks, show_progress, progress_bar_desc)
     74 async def _gather() -> List[Any]:
     75     return await asyncio.gather(*tasks_to_execute)
---> 77 outputs: List[Any] = asyncio_run(_gather())
     78 return outputs

File /opt/conda/lib/python3.10/site-packages/llama_index/core/async_utils.py:33, in asyncio_run(coro)
     30     loop = asyncio.get_event_loop()
     32     # If we're here, there's an existing loop but it's not running
---> 33     return loop.run_until_complete(coro)
     35 except RuntimeError as e:
     36     # If we can't get the event loop, we're likely in a different thread, or its already running
     37     try:

File /opt/conda/lib/python3.10/site-packages/nest_asyncio.py:98, in _patch_loop.<locals>.run_until_complete(self, future)
     95 if not f.done():
     96     raise RuntimeError(
     97         'Event loop stopped before Future completed.')
---> 98 return f.result()

File /opt/conda/lib/python3.10/asyncio/futures.py:201, in Future.result(self)
    199 self.__log_traceback = False
    200 if self._exception is not None:
--> 201     raise self._exception.with_traceback(self._exception_tb)
    202 return self._result

File /opt/conda/lib/python3.10/asyncio/tasks.py:234, in Task.__step(***failed resolving arguments***)
    232         result = coro.send(None)
    233     else:
--> 234         result = coro.throw(exc)
    235 except StopIteration as exc:
    236     if self._must_cancel:
    237         # Task is cancelled right before coro stops.

File /opt/conda/lib/python3.10/site-packages/llama_index/core/async_utils.py:75, in run_async_tasks.<locals>._gather()
     74 async def _gather() -> List[Any]:
---> 75     return await asyncio.gather(*tasks_to_execute)

File /opt/conda/lib/python3.10/asyncio/tasks.py:304, in Task.__wakeup(self, future)
    302 def __wakeup(self, future):
    303     try:
--> 304         future.result()
    305     except BaseException as exc:
    306         # This may also be a cancellation.
    307         self.__step(exc)

File /opt/conda/lib/python3.10/asyncio/tasks.py:232, in Task.__step(***failed resolving arguments***)
    228 try:
    229     if exc is None:
    230         # We use the `send` method directly, because coroutines
    231         # don't have `__iter__` and `__next__` methods.
--> 232         result = coro.send(None)
    233     else:
    234         result = coro.throw(exc)

File /opt/conda/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py:291, in Dispatcher.span.<locals>.async_wrapper(func, instance, args, kwargs)
    283 self.span_enter(
    284     id_=id_,
    285     bound_args=bound_args,
   (...)
    288     tags=tags,
    289 )
    290 try:
--> 291     result = await func(*args, **kwargs)
    292 except BaseException as e:
    293     self.event(SpanDropEvent(span_id=id_, err_str=str(e)))

File /opt/conda/lib/python3.10/site-packages/llama_index/core/base/base_retriever.py:276, in BaseRetriever.aretrieve(self, str_or_query_bundle)
    271 with self.callback_manager.as_trace("query"):
    272     with self.callback_manager.event(
    273         CBEventType.RETRIEVE,
    274         payload={EventPayload.QUERY_STR: query_bundle.query_str},
    275     ) as retrieve_event:
--> 276         nodes = await self._aretrieve(query_bundle=query_bundle)
    277         nodes = await self._ahandle_recursive_retrieval(
    278             query_bundle=query_bundle, nodes=nodes
    279         )
    280         retrieve_event.on_end(
    281             payload={EventPayload.NODES: nodes},
    282         )

File /opt/conda/lib/python3.10/site-packages/llama_index/core/instrumentation/dispatcher.py:291, in Dispatcher.span.<locals>.async_wrapper(func, instance, args, kwargs)
    283 self.span_enter(
    284     id_=id_,
    285     bound_args=bound_args,
   (...)
    288     tags=tags,
    289 )
    290 try:
--> 291     result = await func(*args, **kwargs)
    292 except BaseException as e:
    293     self.event(SpanDropEvent(span_id=id_, err_str=str(e)))

File /opt/conda/lib/python3.10/site-packages/llama_index/core/indices/vector_store/retrievers/retriever.py:112, in VectorIndexRetriever._aretrieve(self, query_bundle)
    108         embed_model = self._embed_model
    109         embedding = await embed_model.aget_agg_embedding_from_queries(
    110             query_bundle.embedding_strs
    111         )
--> 112 return await self._aget_nodes_with_embeddings(
    113     QueryBundle(query_str=query_bundle.query_str, embedding=embedding)
    114 )

File /opt/conda/lib/python3.10/site-packages/llama_index/core/indices/vector_store/retrievers/retriever.py:184, in VectorIndexRetriever._aget_nodes_with_embeddings(self, query_bundle_with_embeddings)
    180 async def _aget_nodes_with_embeddings(
    181     self, query_bundle_with_embeddings: QueryBundle
    182 ) -> List[NodeWithScore]:
    183     query = self._build_vector_store_query(query_bundle_with_embeddings)
--> 184     query_result = await self._vector_store.aquery(query, **self._kwargs)
    185     return self._build_node_list_from_query_result(query_result)

File /opt/conda/lib/python3.10/site-packages/llama_index/vector_stores/qdrant/base.py:968, in QdrantVectorStore.aquery(self, query, **kwargs)
    966     return self.parse_to_query_result(response[0])
    967 else:
--> 968     response = await self._aclient.search(
    969         collection_name=self.collection_name,
    970         query_vector=query_embedding,
    971         limit=query.similarity_top_k,
    972         query_filter=query_filter,
    973     )
    975     return self.parse_to_query_result(response)

AttributeError: 'NoneType' object has no attribute 'search'
5lwkijsr

5lwkijsr1#

嘿 @harshil1973!我在这里帮助你在等待人工维护者时解决任何问题或疑问。让我们一起解决这个bug!
我发现了一个类似的已关闭的问题,它可能有助于你在使用 QueryFusionRetriever 与 Qdrant DB 时解决 AttributeError: 'NoneType' object has no attribute 'search' 错误。错误表明你的 QdrantVectorStore 类中的 _aclient 属性是 None,表示异步客户端没有正确初始化。
要解决此问题,请确保 QdrantVectorStore 示例使用有效的 clientaclient 示例正确初始化,或者使用必要的参数( urlapi_keycollection_name )以便类可以正确初始化其自身的客户端示例。
以下是如何正确初始化 QdrantVectorStore 的示例:
确保 QdrantClientAsyncQdrantClient 使用正确的URL和API密钥正确初始化。如果你使用的是 :memory: 模式,请注意客户端之间的数据不会同步。
此外,你可能还想添加一些调试语句以确保 _aclient 正在正确初始化:
这将帮助你确定客户端是否被设置为 None 以及为什么。
要了解更多详细信息,你可以参考 QdrantVectorStore 类初始化的相关部分。
如果这些步骤不能解决问题,请提供更多关于如何在代码中初始化 QdrantVectorStore 的信息。

  • 要继续与 dosu 交谈,请提及 @dosu。*
s8vozzvw

s8vozzvw3#

@logan-markewich Is there any way I can use local vector database in both the clients like this cause it is giving error like Storage folder /kaggle/working/ is already accessed by another instance of Qdrant client. If you require concurrent access, use Qdrant server instead.

qdrant_client = QdrantClient(
    path = "/kaggle/working/"
)
qdrant_aclient = AsyncQdrantClient(
    "/kaggle/working/"
)
yrwegjxp

yrwegjxp4#

不确定。这似乎是qdrant的限制(我总是使用qdrant的docker服务器镜像,它运行良好)。
解决方法是使用另一个向量数据库,使用服务器的docker镜像,在检索器中设置use_async=False,或者编写自己的检索器来跳过异步向量存储操作(实际上编写这个工作流程会很有趣)。

相关问题