@nicoloboschi 抱歉,我是完全新手,请问我应该在哪里添加这行代码?将完整的节点代码粘贴在这里
from langchain_core.vectorstores import VectorStore
from loguru import logger
from langflow.base.vectorstores.model import LCVectorStoreComponent
from langflow.helpers import docs_to_data
from langflow.inputs import DictInput, FloatInput
from langflow.io import (
BoolInput,
DataInput,
DropdownInput,
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class AstraVectorStoreComponent(LCVectorStoreComponent):
display_name: str = "Astra DB"
description: str = "使用Astra DB实现的具有搜索功能的向量存储"
documentation: str = " [https://python.langchain.com/docs/integrations/vectorstores/astradb](https://python.langchain.com/docs/integrations/vectorstores/astradb) "
name = "AstraDB"
icon: str = "AstraDB"
_cached_vectorstore: VectorStore | None = None
inputs = [ StrInput( name="collection_name", display_name="Collection Name", info="The name of the collection within Astra DB where the vectors will be stored.", required=True, ), SecretStrInput( name="token", display_name="Astra DB Application Token", info="Authentication token for accessing Astra DB.", value="ASTRA_DB_APPLICATION_TOKEN", required=True, ), SecretStrInput( name="api_endpoint", display_name="API Endpoint", info="API endpoint URL for the Astra DB service.", value="ASTRA_DB_API_ENDPOINT", required=True, ), MultilineInput( name="search_input", display_name="Search Input", ), DataInput( name="ingest_data", display_name="Ingest Data", is_list=True, ), StrInput( name="namespace", display_name="Namespace", info="Optional namespace within Astra DB to use for the collection.", advanced=True, ), DropdownInput( name="metric", display_name="Metric", info="Optional distance metric for vector comparisons in the vector store.", options=["cosine", "dot_product", "euclidean"], advanced=True, ), IntInput( name="batch_size", display_name="Batch Size", info="Optional number of data to process in a single batch.", advanced=True, ), IntInput( name="bulk_insert_batch_concurrency", display_name="Bulk Insert Batch Concurrency", info="Optional concurrency level for bulk insert operations.", advanced=True, ), IntInput( name="bulk_insert_overwrite_concurrency", display_name="Bulk Insert Overwrite Concurrency", info="Optional concurrency level for bulk insert operations that overwrite existing data.", advanced=True, ), IntInput( name="bulk_delete_concurrency", display_name="Bulk Delete Concurrency", info="Optional concurrency level for bulk delete operations.", advanced=True, ), DropdownInput( name="setup_mode", display_name="Setup Mode", info="Configuration mode for setting up the vector store, with options like 'Sync', 'Async', or 'Off'.", options=["Sync", "Async", "Off"], advanced=True, value="Sync", ), BoolInput( name="pre_delete_collection", display_name="Pre Delete Collection", info="Boolean flag to determine whether to delete the collection before creating a new one.", advanced=True, ), StrInput( name="metadata_indexing_include", display_name="Metadata Indexing Include", info="Optional list of metadata fields to include in the indexing.", advanced=True, ), HandleInput( name="embedding", display_name="Embedding or Astra Vectorize", input_types=["Embeddings", "dict"], info="Allows either an embedding model or an Astra Vectorize configuration.", # TODO: This should be optional, but need to refactor langchain-astradb first. ), StrInput( name="metadata_indexing_exclude", display_name="Metadata Indexing Exclude", info="Optional list of metadata fields to exclude from the indexing.", advanced=True, ), StrInput( name="collection_indexing_policy", display_name="Collection Indexing Policy", info="Optional dictionary defining the indexing policy for the collection.", advanced=True, ), IntInput( name="number_of_results", display_name="Number of Results", info="Number of results to return.", advanced=True, value=4, ), DropdownInput( name="search_type", display_name="Search Type", info="Search type to use", options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"], value="Similarity", advanced=True, ), FloatInput( name="search_score_threshold", display_name="Search Score Threshold", info="Minimum similarity score threshold for search results. (when using 'Similarity with score threshold')", value=0, advanced=True, ), DictInput( name="search_filter", display_name="Search Metadata Filter", info="Optional dictionary of filters to apply to the search query.", advanced=True, is_list=True, ), ]
def _build_vector_store(self): # cache the vector store to avoid re-initializing and ingest data again if self._cached_vectorstore: return self._cached_vectorstore
try:
from langchain_astradb import AstraDBVectorStore
from langchain_astradb.utils.astradb import SetupMode
except ImportError:
raise ImportError(
"Could not import langchain Astra DB integration package. "
"Please install it with `pip install langchain-astradb`."
)
try:
if not self.setup_mode:
self.setup_mode = self._inputs["setup_mode"].options[0]
setup_mode_value = SetupMode[self.setup_mode.upper()]
except KeyError:
raise ValueError(f"Invalid setup mode: {self.setup_mode}")
if not isinstance(self.embedding, dict):
embedding_dict = {"embedding": self.embedding}
else:
from astrapy.info import CollectionVectorServiceOptions
dict_options = self.embedding.get("collection_vector_service_options", {})
dict_options["authentication"] = {
k: v for k, v in dict_options.get("authentication", {}).items() if k and v
}
dict_options["parameters"] = {k: v for k, v in dict_options.get("parameters", {}).items() if k and v}
embedding_dict = {
"collection_vector_service_options": CollectionVectorServiceOptions.from_dict(dict_options)
}
collection_embedding_api_key = self.embedding.get("collection_embedding_api_key")
if collection_embedding_api_key:
embedding_dict["collection_embedding_api_key"] = collection_embedding_api_key
vector_store_kwargs = {
**embedding_dict,
"collection_name": self.collection_name,
"token": self.token,
"api_endpoint": self.api_endpoint,
"namespace": self.namespace or None,
"metric": self.metric or None,
"batch_size": self.batch_size or None,
"bulk_insert_batch_concurrency": self.bulk_insert_batch_concurrency or None,
"bulk_insert_overwrite_concurrency": self.bulk_insert_overwrite_concurrency or None,
"bulk_delete_concurrency": self.bulk_delete_concurrency or None,
"setup_mode": setup_mode_value,
"pre_delete_collection": self.pre_delete_collection or False,
}
if self.metadata_indexing_include:
vector_store_kwargs["metadata_indexing_include"] = self.metadata_indexing_include
elif self.metadata_indexing_exclude:
vector_store_kwargs["metadata_indexing_exclude"] = self.metadata_indexing_exclude
elif self.collection_indexing_policy:
vector_store_kwargs["collection_indexing_policy"] = self.collection_indexing_policy
try:
vector_store = AstraDBVectorStore(**vector_store_kwargs)
except Exception as e:
raise ValueError(f"Error initializing AstraDBVectorStore: {str(e)}") from e
self._add_documents_to_vector_store(vector_store)
self._cached_vectorstore = vector_store
return vector_store
def _add_documents_to_vector_store(self, vector_store): documents = [] for _input in self.ingest_data or []: if isinstance(_input, Data): documents.append(_input.to_lc_document()) else: raise ValueError("Vector Store Inputs must be Data objects.")
if documents:
logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
try:
vector_store.add_documents(documents)
except Exception as e:
raise ValueError(f"Error adding documents to AstraDBVectorStore: {str(e)}") from e
else:
logger.debug("No documents to add to the Vector Store.")
if self.search_filter:
clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
if len(clean_filter) > 0:
args["filter"] = clean_filter
return args
8条答案
按热度按时间bvhaajcl1#
你好@selvabharathG!我在这里协助处理你在等待人工维护者时可能遇到的任何错误、问题或贡献。让我们一起解决这个问题!
要排查数据处理管道在处理PDF文件时没有错误但无法将记录加载到Astra DB中的问题,你可以按照以下步骤操作:
FileComponent
类支持特定的文本文件类型,对于不支持的文件类型会引发错误。请验证parse_text_file_to_record
函数是否正确处理PDF文件。parse_text_file_to_record
函数正确解析PDF文件并返回一个有效的Record
对象。该函数使用parse_pdf_to_text
从PDF中提取文本。当解析成功时,预期的输出是一个包含提取的文本和元数据的Data
对象,包括文件路径1(https://github.com/logspace-ai/langflow/blob/dev/src/backend/base/langflow/base/data/utils.py)。silent_errors
设置为True
,文件加载过程中的错误将不会引发异常,这可能导致管道在不加载任何数据的情况下继续进行。将silent_errors
设置为False
以捕获并调试文件加载过程中的任何错误。Record
传递给负责将数据插入Astra DB的后续组件。验证组件之间的数据流是否正确设置。通过遵循这些步骤,你应该能够识别并解决阻止记录加载到Astra DB的问题。
d7v8vwbk2#
你好,@selvabharathG。我刚刚尝试了1.0.13版本,对我来说是可以正常工作的。也许在处理过程中出现了一些问题。
你可以尝试使用调试日志激活langflow运行吗?
你应该添加一行这样的代码:
ie3xauqp3#
你好,@selvabharathG。我刚刚尝试了1.0.13版本,对我来说是可以正常工作的。可能是在处理过程中出现了一些问题。你可以尝试在运行langflow时激活调试日志。
你应该添加一行这样的代码:
export LANGFLOW_LOG_LEVEL=debug
langflow run
感谢@nicoloboschi的回复,请问哪一个流程需要添加这行代码?
bxfogqkk4#
AstraDB节点在索引数据时应该记录此信息。这当然取决于PDF的大小。
f2uvfpb95#
_cached_vectorstore: VectorStore | None = None
inputs = [
StrInput(
name="collection_name",
display_name="Collection Name",
info="The name of the collection within Astra DB where the vectors will be stored.",
required=True,
),
SecretStrInput(
name="token",
display_name="Astra DB Application Token",
info="Authentication token for accessing Astra DB.",
value="ASTRA_DB_APPLICATION_TOKEN",
required=True,
),
SecretStrInput(
name="api_endpoint",
display_name="API Endpoint",
info="API endpoint URL for the Astra DB service.",
value="ASTRA_DB_API_ENDPOINT",
required=True,
),
MultilineInput(
name="search_input",
display_name="Search Input",
),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
StrInput(
name="namespace",
display_name="Namespace",
info="Optional namespace within Astra DB to use for the collection.",
advanced=True,
),
DropdownInput(
name="metric",
display_name="Metric",
info="Optional distance metric for vector comparisons in the vector store.",
options=["cosine", "dot_product", "euclidean"],
advanced=True,
),
IntInput(
name="batch_size",
display_name="Batch Size",
info="Optional number of data to process in a single batch.",
advanced=True,
),
IntInput(
name="bulk_insert_batch_concurrency",
display_name="Bulk Insert Batch Concurrency",
info="Optional concurrency level for bulk insert operations.",
advanced=True,
),
IntInput(
name="bulk_insert_overwrite_concurrency",
display_name="Bulk Insert Overwrite Concurrency",
info="Optional concurrency level for bulk insert operations that overwrite existing data.",
advanced=True,
),
IntInput(
name="bulk_delete_concurrency",
display_name="Bulk Delete Concurrency",
info="Optional concurrency level for bulk delete operations.",
advanced=True,
),
DropdownInput(
name="setup_mode",
display_name="Setup Mode",
info="Configuration mode for setting up the vector store, with options like 'Sync', 'Async', or 'Off'.",
options=["Sync", "Async", "Off"],
advanced=True,
value="Sync",
),
BoolInput(
name="pre_delete_collection",
display_name="Pre Delete Collection",
info="Boolean flag to determine whether to delete the collection before creating a new one.",
advanced=True,
),
StrInput(
name="metadata_indexing_include",
display_name="Metadata Indexing Include",
info="Optional list of metadata fields to include in the indexing.",
advanced=True,
),
HandleInput(
name="embedding",
display_name="Embedding or Astra Vectorize",
input_types=["Embeddings", "dict"],
info="Allows either an embedding model or an Astra Vectorize configuration.", # TODO: This should be optional, but need to refactor langchain-astradb first.
),
StrInput(
name="metadata_indexing_exclude",
display_name="Metadata Indexing Exclude",
info="Optional list of metadata fields to exclude from the indexing.",
advanced=True,
),
StrInput(
name="collection_indexing_policy",
display_name="Collection Indexing Policy",
info="Optional dictionary defining the indexing policy for the collection.",
advanced=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=4,
),
DropdownInput(
name="search_type",
display_name="Search Type",
info="Search type to use",
options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
value="Similarity",
advanced=True,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results. (when using 'Similarity with score threshold')",
value=0,
advanced=True,
),
DictInput(
name="search_filter",
display_name="Search Metadata Filter",
info="Optional dictionary of filters to apply to the search query.",
advanced=True,
is_list=True,
),
]
def _build_vector_store(self):
# cache the vector store to avoid re-initializing and ingest data again
if self._cached_vectorstore:
return self._cached_vectorstore
def _add_documents_to_vector_store(self, vector_store):
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
raise ValueError("Vector Store Inputs must be Data objects.")
def _map_search_type(self):
if self.search_type == "Similarity with score threshold":
return "similarity_score_threshold"
elif self.search_type == "MMR (Max Marginal Relevance)":
return "mmr"
else:
return "similarity"
def _build_search_args(self):
args = {
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
}
def search_documents(self) -> list[Data]:
vector_store = self._build_vector_store()
def get_retriever_kwargs(self):
search_args = self._build_search_args()
return {
"search_type": self._map_search_type(),
"search_kwargs": search_args,
}
def build_vector_store(self):
vector_store = self._build_vector_store()
return vector_store
k5ifujac6#
嘿,@selvabharathG,抱歉,也许我没有说清楚。你不需要添加任何行。如果你在运行langflow之前设置LANGFLOW_LOG_LEVEL=debug,你会在输出中看到那一行日志。
gg58donl7#
selvabharathG,只是想确认你是否解决了问题,这样我就可以关闭这个问题了。
tcomlyy68#
嗨@nicoloboschi
抱歉回复晚了。我正在旅行。
我无法添加建议的方法,因为我正在尝试在DataStax在线GUI中使用这种流程。
请问还有其他方法可以识别问题吗?