跳到主要内容

Python API

RAGFlow的Python API完整参考。在继续之前,请确保您已经准备好用于身份验证的RAGFlow API密钥

注意事项

运行以下命令下载Python SDK:

pip install ragflow-sdk

错误代码

代码消息描述
400请求无效请求参数无效
401未经授权访问未授权
403禁止访问访问被拒绝
404资源不存在资源未找到
500内部服务器错误服务器内部错误
1001片段ID无效片段ID无效
1002片段更新失败更新片段失败

兼容OpenAI的API

创建聊天完成

通过OpenAI的API为给定的历史对话创建模型响应。

参数

model: str,必填

用于生成响应的模型。服务器会自动解析此参数,因此您可以将其设置为任何值。

messages: list[object],必填

用于生成响应的历史聊天消息列表。必须包含至少一个具有user角色的消息。

stream: boolean

是否以流的形式接收响应。如果您希望一次性收到整个响应而不是分段形式,请显式将此参数设为false

返回值

  • 成功:OpenAI的消息格式的响应
  • 失败:Exception

示例

from openai import OpenAI

model = "模型"
client = OpenAI(api_key="ragflow-api-key", base_url=f"http://ragflow_address/api/v1/chats_openai/<chat_id>")

stream = True
reference = True

completion = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "您是一个乐于助人的助手"},
{"role": "user", "content": "你是谁?"},
{"role": "assistant", "content": "我是一位名叫...的AI助手"},
{"role": "user", "content": "你能告诉我如何安装neovim吗"}
],
stream=stream,
extra_body={"reference": reference}
)

if stream:
for chunk in completion:
print(chunk)
if reference and chunk.choices[0].finish_reason == "stop":
print(f"参考:\n{chunk.choices[0].delta.reference}")
print(f"最终内容:\n{chunk.choices[0].delta.final_content}")
else:
print(completion.choices[0].message.content)
if reference:
print(completion.choices[0].message.reference)

数据集管理

创建数据集

RAGFlow.create_dataset(
name: str,
avatar: Optional[str] = None,
description: Optional[str] = None,
embedding_model: Optional[str] = "BAAI/bge-large-zh-v1.5@BAAI",
permission: str = "me",
chunk_method: str = "naive",
parser_config: DataSet.ParserConfig = None
) -> DataSet

创建数据集。

参数

name: str,必填

要创建的数据集的唯一名称。必须满足以下要求:

  • 最长128个字符。
  • 不区分大小写。
avatar: str

头像的Base64编码,默认为None

description: str

数据集的简短描述,默认为None

permission

指定谁可以访问以创建该数据集。可选值:

  • "me":(默认)只有您自己可以管理数据集。
  • "team":所有团队成员都可以管理数据集。
chunk_method, str

要创建的数据集的分段方法。可选项包括:

  • "naive": 通用(默认)
  • "manual: 手动
  • `"qa": Q&A
  • `"table": 表格
  • `"paper": 论文
  • `"book": 图书
  • `"laws": 法律
  • `"presentation": 演示文稿
  • `"picture": 图片
  • `"one": 一个
  • `"email": 邮件
parser_config

数据集的解析配置。ParserConfig 对象的属性根据选择的 chunk_method 而有所不同:

  • chunk_method="naive":
    {"chunk_token_num":512,"delimiter":"\\n","html4excel":False,"layout_recognize":True,"raptor":{"use_raptor":False}}.
  • chunk_method="qa":
    {"raptor": {"use_raptor": False}}
  • chunk_method="manuel":
    {"raptor": {"use_raptor": False}}
  • chunk_method="table":
    None
  • chunk_method="paper":
    {"raptor": {"use_raptor": False}}
  • chunk_method="book":
    {"raptor": {"use_raptor": False}}
  • chunk_method="laws":
    {"raptor": {"use_raptor": False}}
  • chunk_method="picture":
    None
  • chunk_method="presentation":
    {"raptor": {"use_raptor": False}}
  • chunk_method="one":
    None
  • chunk_method="knowledge-graph":
    {"chunk_token_num":128,"delimiter":"\\n","entity_types":["organization","person","location","event","time"]}
  • chunk_method="email":
    None

返回

  • 成功:返回一个 dataset 对象。
  • 失败:抛出异常。

示例代码

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.create_dataset(name="kb_1")

删除数据集

RAGFlow.delete_datasets(ids: list[str] | None = None)

通过 ID 删除数据集。

参数

ids: list[str]None, 必选

要删除的数据集的 ID 列表。默认值为 None

  • 如果 None,则所有数据集将被删除。
  • 如果是一个包含多个 ID 的数组,则仅删除指定的数据集。
  • 如果是空数组,则不执行任何操作。

返回

  • 成功:无返回值。
  • 失败:抛出异常。

示例代码

rag_object.delete_datasets(ids=["d94a8dc02c9711f0930f7fbc369eab6d","e94a8dc02c9711f0930f7fbc369eab6e"])

列出数据集

RAGFlow.list_datasets(
page: int = 1,
page_size: int = 30,
orderby: str = "create_time",
desc: bool = True,
id: str = None,
name: str = None
) -> list[DataSet]

列出数据集。

参数

page: int

指定要显示的数据集页面。默认值为 1

page_size: int

每页的数据集数量。默认值为 30

orderby: str

按以下字段排序:

  • "create_time" (默认)
  • "update_time"
desc: bool

指示返回的数据集是否应按降序排列。默认值为 True

id: str

要检索的特定数据集 ID。默认值为 None

name: str

要检索的特定数据集名称。默认值为 None

返回

  • 成功:返回一个包含 DataSet 对象列表。
  • 失败:抛出异常。

示例代码

列出所有数据集
for dataset in rag_object.list_datasets():
print(dataset)
通过 ID 获取数据集信息
dataset = rag_object.list_datasets(id="id_1")
print(dataset[0])

更新数据集配置

DataSet.update(update_message: dict)

更新当前数据集的配置。

参数

update_message: dict[str, str|int], 必选

一个字典,表示需要更新的属性,包含以下键:

  • "name": str 数据集的新名称。
    • 只支持基本多语言平面(BMP)
    • 最长 128 个字符
    • 不区分大小写
  • "avatar": (Body 参数), string
    新的头像 base64 编码。
    • 最大长度为 65535 字符。
  • "embedding_model": (Body 参数), string
    更新后的嵌入模型名称。
    • 在更新前请确保 "chunk_count"0
    • 最长 255 个字符
    • 必须遵循 model_name@model_factory 格式。
  • "permission": (Body 参数), string
    更新后的数据集权限。可用选项:
    • "me" (默认):只有您自己可以管理该数据集。
    • "team":团队所有成员都可以管理该数据集。
  • "pagerank": (Body 参数), int
    参见 Set page rank
    • 默认值为 0
    • 最小值为 0
    • 最大值为 100
  • "chunk_method": (Body 参数), enum<string>
    数据集的分块方法。可用选项:
    • "naive" (默认): 一般
    • "book": 图书
    • "email": 邮件
    • "laws": 法律条文
    • "manual": 手动
    • "one": 单一数据集
    • "paper": 论文
    • "picture": 图片
    • "presentation": 演示文稿
    • "qa": Q&A
    • "table": 表格

返回

  • 成功:无返回值。
  • 失败:抛出异常。

示例代码

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(name="kb_name")[0]
dataset.update({"embedding_model":"BAAI/bge-zh-v1.5", "chunk_method":"manual"})

数据集内的文件管理


上传文档

DataSet.upload_documents(document_list: list[dict])

将文档上传到当前数据集中。

参数

document_list: list[dict], 必需

代表要上传的文档字典列表,每个字典包含以下键:

  • "display_name":(可选)在数据集中显示的文件名。
  • "blob":(可选)待上传文件的二进制内容。

返回

  • 成功时无返回值。
  • 失败时抛出异常。

示例

dataset = rag_object.create_dataset(name="kb_name")
dataset.upload_documents([{"display_name": "1.txt", "blob": "<BINARY_CONTENT_OF_THE_DOC>"}, {"display_name": "2.pdf", "blob": "<BINARY_CONTENT_OF_THE_DOC>"}])

更新文档

Document.update(update_message:dict)

更新当前文档的配置信息。

参数

update_message: dict[str, str|dict[]], 必需

代表要更新的属性字典,包含以下键:

  • "display_name"str 要更新的文档名称。

  • "meta_fields"dict[str, Any] 文档的元字段信息。

  • "chunk_method"str 用于解析文档的方法。

    • "naive": 普通方法
    • "manual": 手动方法
    • "qa": Q&A 方法
    • "table": 表格方法
    • "paper": 学术论文方法
    • "book": 图书方法
    • "laws": 法规方法
    • "presentation": 演示文稿方法
    • "picture": 图片方法
    • "one": 单一方法
    • "email": 邮件方法
  • "parser_config"dict[str, Any] 文档的解析配置。其属性取决于选择的 "chunk_method"

    • "chunk_method"="naive": {"chunk_token_num":128,"delimiter":"\\n","html4excel":False,"layout_recognize":True,"raptor":{"use_raptor":False}}
    • chunk_method="qa": {"raptor": {"use_raptor": False}}
    • chunk_method="manual": {"raptor": {"use_raptor": False}}
    • chunk_method="table":
      None
    • chunk_method="paper": {"raptor": {"use_raptor": False}}
    • chunk_method="book": {"raptor": {"use_raptor": False}}
    • chunk_method="laws":
      {"raptor": {"use_raptor": False}}
    • chunk_method="presentation": {"raptor": {"use_raptor": False}}
    • chunk_method="picture": None
    • chunk_method="one": None
    • chunk_method="knowledge-graph":
      {"chunk_token_num":128,"delimiter":"\\n","entity_types":["organization","person","location","event","time"]}
    • chunk_method="email": None

返回

  • 成功时无返回值。
  • 失败时抛出异常。

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(id='id')
dataset = dataset[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
doc.update([{"parser_config": {"chunk_token_num": 256}}, {"chunk_method": "manual"}])

下载文档

Document.download() -> bytes

下载当前文档。

返回

以字节形式返回下载的文档。

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(id="id")
dataset = dataset[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
open("~/ragflow.txt", "wb+").write(doc.download())
print(doc)

列出文档

Dataset.list_documents(
id: str = None,
keywords: str = None,
page: int = 1,
page_size: int = 30,
order_by: str = "create_time",
desc: bool = True,
create_time_from: int = 0,
create_time_to: int = 0
) -> list[Document]

列出当前数据集中的文档。

参数

id: str

要检索的文档ID。默认为None

keywords: str

用于匹配文档标题的关键字。默认为None

page: int

指定文档显示的页码。默认值为1。

page_size: int

每一页上最多显示的文档数量。默认值为30。

order_by: str

按哪个字段对文档进行排序。可选选项:

  • "create_time"(默认)
  • "update_time"
desc: bool

指示是否以降序方式获取检索到的文档。默认为True

create_time_from: int

Unix时间戳,用于过滤创建时间在此时刻之后的文档。0表示无过滤条件。默认值为0。

create_time_to: int

Unix时间戳,用于过滤创建时间在此时刻之前的文档。0表示无过滤条件。默认值为0。

返回值

  • 成功:返回Document对象列表。
  • 失败:抛出异常

一个Document对象包含以下属性:

  • id: 文档ID。默认为""
  • name: 文档名称。默认为""
  • thumbnail: 文档缩略图。默认为None
  • dataset_id: 与文档关联的数据集ID。默认为None
  • chunk_method: 分块方法名。默认为"naive"
  • source_type: 文档的来源类型。默认为"local"
  • type: 文档类型或类别。默认为空字符串,预留将来使用。
  • created_by: str 创建文档的用户名称。默认为""
  • size: int 以字节为单位的文档大小。默认为0
  • token_count: int 文档中的 token 数量。默认为0
  • chunk_count: int 文档的分块数量。默认为0
  • progress: float 当前处理进度,以百分比表示。默认为0.0
  • progress_msg: str 表示当前进度状态的消息。默认为空字符串。
  • process_begin_at: datetime 文档处理开始的时间。默认为None
  • process_duration: float 处理时间(秒)。默认为0.0
  • run: str 文档的处理状态:
    • "UNSTART" (默认)
    • "RUNNING"
    • "CANCEL"
    • "DONE"
    • "FAIL"
  • status: str 预留将来使用。
  • parser_config: ParserConfig 解析器配置对象。其属性根据选择的chunk_method而变化:
    • chunk_method="naive":
      {"chunk_token_num":128,"delimiter":"\\n","html4excel":False,"layout_recognize":True,"raptor":{"use_raptor":False}}
    • chunk_method="qa":
      {"raptor": {"use_raptor": False}}
    • chunk_method="manuel":
      {"raptor": {"use_raptor": False}}
    • chunk_method="table":
      None
    • chunk_method="paper":
      {"raptor": {"use_raptor": False}}
    • chunk_method="book":
      {"raptor": {"use_raptor": False}}
    • chunk_method="laws":
      {"raptor": {"use_raptor": False}}
    • chunk_method="presentation":
      {"raptor": {"use_raptor": False}}
    • chunk_method="picure":
      None
    • chunk_method="one":
      None
    • chunk_method="email":
      None

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.create_dataset(name="kb_1")

filename1 = "~/ragflow.txt"
blob = open(filename1, "rb").read()
dataset.upload_documents([{"name": filename1, "blob": blob}])
for doc in dataset.list_documents(keywords="rag", page=0, page_size=12):
print(doc)

删除文档

DataSet.delete_documents(ids: list[str] = None)

通过ID删除文档。

参数

ids: list[list]

要删除的文档的ID列表。默认为None,如果未指定,则会删除数据集中所有的文档。

返回值

  • 成功:无返回。
  • 失败:抛出异常。

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(name="kb_1")
dataset = dataset[0]
dataset.delete_documents(ids=["id_1", "id_2"])

解析文档

DataSet.async_parse_documents(document_ids: list[str]) -> None

解析当前数据集中的文档。

参数

document_ids: list[str], 必需

要解析的文档ID列表。

返回值

  • 成功:无返回。
  • 失败:抛出异常。

示例

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.create_dataset(name="dataset_name")
documents = [
{'display_name': 'test1.txt', 'blob': open('./test_data/test1.txt', "rb").read()},
{'display_name': 'test2.txt', 'blob': open('./test_data/test2.txt', "rb").read()},
{'display_name': 'test3.txt', 'blob': open('./test_data/test3.txt', "rb").read()}
]
dataset.upload_documents(documents)
documents = dataset.list_documents(keywords="test")
ids = []
for document in documents:
ids.append(document.id)
dataset.async_parse_documents(ids)
print("异步批量解析已启动。")

停止解析文档

DataSet.async_cancel_parse_documents(document_ids: list[str]) -> None

停止指定文档的解析。

参数

document_ids: list[str], 必需

需要停止解析的文档ID列表。

返回值

  • 成功:无返回。
  • 失败:抛出异常。

示例

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.create_dataset(name="dataset_name")
documents = [
{'display_name': 'test1.txt', 'blob': open('./test_data/test1.txt',"rb").read()},
{'display_name': 'test2.txt', 'blob': open('./test_data/test2.txt',"rb").read()},
{'display_name': 'test3.txt', 'blob': open('./test_data/test3.txt',"rb").read()}
]
dataset.upload_documents(documents)
documents = dataset.list_documents(keywords="test")
ids = []
for document in documents:
ids.append(document.id)
dataset.async_parse_documents(ids)
print("异步批量解析已启动。")
dataset.async_cancel_parse_documents(ids)
print("异步批量解析已取消。")

数据集内的分块管理


添加分块

Document.add_chunk(content: str, important_keywords: list[str] = []) -> Chunk

向当前文档添加一个分块。

参数

content: str, 必填

该分块的文本内容。

important_keywords: list[str]

与该分块关联的关键术语或短语。默认为空列表。

返回值

  • 成功:Chunk 对象。
  • 失败:异常对象。

一个 Chunk 对象包含以下属性:

  • id: str: 分块 ID。
  • content: str 分块的文本内容。
  • important_keywords: list[str] 与该分块关联的关键术语或短语列表。
  • create_time: str 分块创建的时间(添加到文档时)。
  • create_timestamp: float 表示分块创建时间戳,以自1970年1月1日以来的秒数表示。
  • dataset_id: str 关联的数据集 ID。
  • document_name: str 关联的文档名称。
  • document_id: str 关联的文档 ID。
  • available: bool 分块在数据集中的可用状态。可选值:
    • False: 不可用
    • True: 可用(默认)

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
datasets = rag_object.list_datasets(id="123")
dataset = datasets[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
chunk = doc.add_chunk(content="xxxxxxx")

列出分块

Document.list_chunks(keywords: str = None, page: int = 1, page_size: int = 30, id : str = None) -> list[Chunk]

列出当前文档中的所有分块。

参数

keywords: str

用于匹配分块内容的关键词。默认为None

page: int

指定显示分块的页码。默认值为1。

page_size: int

每页的最大分块数量。默认值为30。

id: str

要获取的分块 ID。默认为None,表示列出所有分块信息。

返回值

  • 成功:返回一个包含多个 Chunk 对象的列表。
  • 失败:异常对象。

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets("123")
dataset = dataset[0]
docs = dataset.list_documents(keywords="test", page=1, page_size=12)
for chunk in docs[0].list_chunks(keywords="rag", page=0, page_size=12):
print(chunk)

删除分块

Document.delete_chunks(chunk_ids: list[str])

通过 ID 删除分块。

参数

chunk_ids: list[str]

要删除的分块 ID 列表。默认为None,表示删除当前文档中的所有分块。

返回值

  • 成功:无返回值。
  • 失败:异常对象

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(id="123")
dataset = dataset[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
chunk = doc.add_chunk(content="xxxxxxx")
doc.delete_chunks(["id_1","id_2"])

更新分块

Chunk.update(update_message: dict)

更新当前分块的内容或配置信息。

参数

update_message: dict[str, str|list[str]|bool] 必填

表示要更新的属性字典,包含以下键:

  • "content": str 分块的文本内容。
  • "important_keywords": list[str] 与该分块关联的关键术语或短语列表。
  • "available": bool 分块在数据集中的可用状态。可选值:
    • False: 不可用
    • True: 可用(默认)

返回值

  • 成功:无返回值。
  • 失败:异常对象。

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(id="123")
dataset = dataset[0]
doc = dataset.list_documents(id="wdfxb5t547d")
doc = doc[0]
chunk = doc.add_chunk(content="xxxxxxx")
chunk.update({"content":"sdfx..."})

获取文档片段

RAGFlow.retrieve(question:str="", dataset_ids:list[str]=None, document_ids=list[str]=None, page:int=1, page_size:int=30, similarity_threshold:float=0.2, vector_similarity_weight:float=0.3, top_k:int=1024,rerank_id:str=None,keyword:bool=False,highlight:bool=False) -> list[Chunk]

从指定的数据集中检索文档片段。

参数

question: str, 必填

用户查询或查询关键字。默认为""

dataset_ids: list[str], 必填

要搜索的数据集ID列表。默认为None

document_ids: list[str]

要检索的文档ID列表。默认为None。您必须确保所选所有文档使用相同的嵌入模型,否则会出现错误。

page: int

要检索的文档起始索引。默认为1

page_size: int

一次最多检索的片段数量。默认为30

similarity_threshold: float

最小相似度分数。默认值为0.2

vector_similarity_weight: float

向量余弦相似性权重。默认值为0.3。如果x代表向量余弦相似性,则(1 - x)是术语相似性的权重。

top_k: int

参与向量余弦计算的片段数量。默认为1024

rerank_id: str

重排序模型ID。默认为None

keyword: bool

是否启用基于关键词匹配:

  • True: 启用基于关键词匹配。
  • False: 禁用基于关键词匹配(默认)。
highlight: bool

指定结果中是否高亮显示匹配的术语:

  • True: 高亮显示匹配的术语。
  • False: 不高亮显示匹配的术语(默认)。
cross_languages: list[string]

为了在不同语言中实现关键词检索,应翻译成的目标语言列表。

返回值

  • 成功:包含文档片段对象的列表。
  • 失败:抛出异常。

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
dataset = rag_object.list_datasets(name="ragflow")
dataset = dataset[0]
name = 'ragflow_test.txt'
path = './test_data/ragflow_test.txt'
documents =[{"display_name":"test_retrieve_chunks.txt","blob":open(path, "rb").read()}]
docs = dataset.upload_documents(documents)
doc = docs[0]
doc.add_chunk(content="This is a chunk addition test")
for c in rag_object.retrieve(dataset_ids=[dataset.id],document_ids=[doc.id]):
print(c)

聊天助手管理


创建聊天助手

RAGFlow.create_chat(
name: str,
avatar: str = "",
dataset_ids: list[str] = [],
llm: Chat.LLM = None,
prompt: Chat.Prompt = None
) -> Chat

创建一个聊天助手。

参数

name: str, 必填

聊天助手的名字。

avatar: str

头像的Base64编码。默认为""

dataset_ids: list[str]

相关数据集的ID列表。默认为[""]

llm: Chat.LLM

用于创建聊天助手的语言模型设置。默认为None。当值为None时,将生成一个包含以下属性的字典作为默认值:

  • model_name: str 聊天模型名称。如果它为None,则使用用户的默认聊天模型。
  • temperature: float 控制模型预测随机性的参数。温度越低,响应更保守;温度越高,则生成更具创意和多样化的响应。默认值为0.1
  • top_p: float 知道作“核采样”的这个参数设置了一个阈值来从一个较小的单词集中选择样本。它聚焦于最可能出现的词,忽略不太可能出现的那些词。默认值为0.3
  • presence_penalty: float 通过惩罚在对话中已经出现过的词汇来抑制模型重复信息的趋势。默认值为0.2
  • frequency penalty: float 类似于出现频次惩罚,它减少了模型频繁重复使用同一个词的倾向。默认值为0.7

请直接输出翻译后的中文内容,不要添加任何额外说明。

提示:Chat.Prompt

LLM需要遵循的指令。一个 Prompt 对象包含以下属性:

  • similarity_threshold: float RAGFlow 在检索过程中采用加权关键词相似度和加权向量余弦相似度的组合,或者使用加权关键词相似度与重排序模型得分的组合。如果相似度分数低于此阈值,则对应的段落将从结果中排除。默认值为 0.2
  • keywords_similarity_weight: float 此参数设置在混合相似度评分(向量余弦相似度或重排序模型相似度)中的关键词相似性的权重。通过调整这个权重,可以控制关键词相似性相对于其他相似性指标的影响程度。默认值为 0.7
  • top_n: int 该参数指定了相似度分数高于 similarity_threshold 的前 N 个段落的数量,这些段落将被传送到LLM中进行处理。LLM 只会访问这“N”个最相关的段落。默认值为 8
  • variables: list[dict[]] 此参数列出用于 Chat Configurations 中的 "System" 字段中的变量列表。注意:
    • knowledge 是一个保留变量,代表检索到的段落。
    • 在 "System" 中的所有变量都应该使用大括号 {}
    • 默认值为 [{"key": "knowledge", "optional": True}]
  • rerank_model: str 如果没有指定,则默认使用向量余弦相似度;如果指定了,将使用重排序得分。默认为 ""
  • top_k: int 用于重新排序或从列表或集合中选择基于特定排名准则的前 K 个项目的过程。默认值为1024。
  • empty_response: str 如果用户的问题在数据集中没有任何检索到的内容,此字段将被用作响应内容。为了允许 LLM 在未找到任何相关内容时自由发挥,请保持为空白,默认值为 None
  • opener: str 对用户的问候语。默认值为 "Hi! I am your assistant, can I help you?"
  • show_quote: bool 表示是否显示文本的来源。默认值为 True
  • prompt: str 提示内容。

返回

  • 成功:一个代表聊天助手的 Chat 对象。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
datasets = rag_object.list_datasets(name="kb_1")
dataset_ids = []
for dataset in datasets:
dataset_ids.append(dataset.id)
assistant = rag_object.create_chat("Miss R", dataset_ids=dataset_ids)

更新聊天助手

Chat.update(update_message: dict)

更新当前聊天助手的配置。

参数

更新消息:dict[str, str|list[str]|dict[]], 必填

表示要更新的属性字典,包含以下键:

  • "name": str 修订后的聊天助手名称。
  • "avatar": str 头像的Base64编码,默认为 ""
  • "dataset_ids": list[str] 要更新的数据集ID列表。
  • "llm": dict LLM设置:
    • "model_name", str 聊天模型名称。
    • "temperature", float 控制模型预测的随机性。较低的温度会产生更保守的回答,较高的温度则会生成更具创意和多样性的回答。
    • "top_p", float 又称“核采样”,此参数设置了一个阈值以选择较小的一组单词进行采样。
    • "presence_penalty", float 通过惩罚在对话中出现过的词语,这可以防止模型重复相同的信息。
    • "frequency penalty", float 类似于存在惩罚项,这会减少模型重用同一个词的倾向性。
  • "prompt" : LLM需要遵循的指令。
    • "similarity_threshold": float RAGFlow 在检索过程中使用加权关键词相似度和向量余弦相似度或重新排名分数的组合。此参数设置用户查询与块之间的相似度阈值。如果一个相似性得分低于该阈值,则对应的块将不包括在结果中,默认值为 0.2
    • "keywords_similarity_weight": float 设置关键词相似度与向量余弦相似度或重新排名模型相似度的混合相似度分数中的权重。通过调整此权重,可以控制关键字相似性与其他相似度指标的关系,默认值为 0.7
    • "top_n": int 此参数指定了超过 similarity_threshold 相似得分的前N个块的数量。LLM仅访问这些'前N个' 块,默认值是 8
    • "variables": list[dict[]] 这个参数列出了在“Chat Configurations”中的“System”字段中使用的变量。注意:
      • knowledge 是保留的变量,代表检索到的块。
      • “System” 中的所有变量都应使用大括号表示,默认值为 [{"key": "knowledge", "optional": True}]
    • "rerank_model": str 如果未指定,则将使用向量余弦相似度;否则使用重新排名分数。默认为空字符串。
    • "empty_response": str 如果在数据集中没有检索到任何信息,此内容作为回复。为了允许LLM在没有任何检索时进行即兴创作,请留空,默认值为 None
    • "opener": str 用户的问候语,默认为 "Hi! I am your assistant, can I help you?"
    • "show_quote: bool 表示是否显示文本来源, 默认为 True.
    • "prompt": str 指令内容。

返回值

  • 成功:无返回值。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
datasets = rag_object.list_datasets(name="kb_1")
dataset_id = datasets[0].id
assistant = rag_object.create_chat("Miss R", dataset_ids=[dataset_id])
assistant.update({"name": "Stefan", "llm": {"temperature": 0.8}, "prompt": {"top_n": 8}})

删除聊天助手

RAGFlow.delete_chats(ids: list[str] = None)

根据ID删除聊天助手。

参数

ids: list[str]

要删除的聊天助手的ID列表,默认为None。如果为空或未指定,则系统中的所有聊天助手将被删除。

返回值

  • 成功:无返回值。
  • 失败:Exception

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
rag_object.delete_chats(ids=["id_1","id_2"])

列出聊天助手

RAGFlow.list_chats(
page: int = 1,
page_size: int = 30,
orderby: str = "create_time",
desc: bool = True,
id: str = None,
name: str = None
) -> list[Chat]

列出聊天助手。

参数

page: int

指定显示的页面,默认为1

page_size: int

每页显示的聊天助手数量,默认为30

orderby: str

结果按哪个属性排序。可选值:

  • "create_time"(默认)
  • "update_time"
desc: bool

是否以降序方式检索聊天助手, 默认为 True

id: str

要获取的聊天助手ID,默认为 None

名称:str

要检索的聊天助手名称。默认为 None

返回值

  • 成功:一个包含Chat对象的列表。
  • 失败:Exception

示例代码

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
for assistant in rag_object.list_chats():
print(assistant)

会话管理


创建聊天助手的会话

Chat.create_session(name: str = "New session") -> Session

创建当前聊天助手的新会话。

参数

名称:str

要创建的会话名称。

返回值

  • 成功:包含以下属性的 Session 对象:
    • id: str 自动生成的唯一标识符。
    • name: str 创建的会话名称。
    • message: list[Message] 创建的会话初始消息。默认为 [{"role": "assistant", "content": "你好!我是你的助手,我能帮你吗?"}]
    • chat_id: str 相关聊天助手的 ID。
  • 失败:Exception

示例代码

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
session = assistant.create_session()

更新聊天助手的会话信息

Session.update(update_message: dict)

更新当前聊天助手的当前会话信息。

参数

update_message: dict[str, Any],必需

表示要更新属性的字典。可以指定以下键:

  • "name"str 会话的新名称。

返回值

  • 成功:不返回任何值。
  • 失败:Exception

示例代码

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
session = assistant.create_session("session_name")
session.update({"name": "updated_name"})

列出聊天助手的会话

Chat.list_sessions(
page: int = 1,
page_size: int = 30,
orderby: str = "create_time",
desc: bool = True,
id: str = None,
name: str = None
) -> list[Session]

列出与当前聊天助手关联的会话。

参数

page: int

指定显示会话的页面号。默认为1

page_size: int

每页包含的会话语数量。默认为30

orderby: str

按哪个字段排序会话:

  • "create_time"(默认)
  • "update_time"
desc: bool

指示是否按降序返回会话,默认为True

id: str

要获取的聊天会话 ID。默认为None

name: str

要检索的聊天会话名称。默认为None

返回值

  • 成功:与当前聊天助手关联的所有会话对象列表。
  • 失败:Exception

示例代码

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
for session in assistant.list_sessions():
print(session)

删除聊天助手的会话

Chat.delete_sessions(ids:list[str] = None)

通过 ID 删除当前聊天助手的会话。

参数

ids: list[str]

要删除的会话语 ID 列表。默认为None。如果未指定,将删除所有与当前聊天助手关联的会话。

返回值

  • 成功:不返回任何值。
  • 失败:Exception

示例代码

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
assistant.delete_sessions(ids=["id_1","id_2"])

与聊天助手对话

Session.ask(question: str = "", stream: bool = False, **kwargs) -> Optional[Message, iter[Message]]

向指定的聊天助手提问以启动人工智能驱动的会话。

提示

在流模式下,系统判断是否包含引用信息。

参数

question: str,必需

要开始对话的问题。默认为空字符串。

stream: bool

指示是否启用流式输出:

  • True: 启用流(默认)。
  • False: 禁用流。
**kwargs

提示中的参数(system)。

返回值

  • 如果设置streamFalse,则返回一个包含问题响应的Message对象。
  • 如果设置streamTrue,则返回多个message对象的迭代器(iter[Message])。

以下是Message对象的属性:

id: str

自动生成的消息ID。

content: str

消息的内容。默认值为 "Hi! I am your assistant, can I help you?".

reference: list[Chunk]

表示引用消息的一系列Chunk对象列表,每个对象包含以下属性:

  • id str
    • 单元的ID。
  • content str
    • 单元的内容。
  • img_id str
    • 表示单元快照的ID。仅当单元源自图片、PPT、PPTX或PDF文件时适用。
  • document_id str
    • 引用文档的ID。
  • document_name str
    • 引用文档的名字。
  • position list[str]
    • 单元在引用文档中的位置信息。
  • dataset_id str
    • 引用文档所属数据集的ID。
  • similarity float
    • 表示单元从0到1的综合相似度评分,数值越大表示越相似。它是vector_similarityterm_similarity的加权总和。
  • vector_similarity float
    • 单元从0到1的向量相似性评分,数值越大表示向量嵌入之间更相似。
  • term_similarity float
    • 单元从0到1的关键词相似性评分,数值越大表示关键词之间越相似。

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
assistant = rag_object.list_chats(name="Miss R")
assistant = assistant[0]
session = assistant.create_session()

print("\n==================== Miss R =====================\n")
print("你好。我能为你做些什么呢?")

while True:
question = input("\n==================== 用户 =====================\n> ")
print("\n==================== Miss R =====================\n")

cont = ""
for ans in session.ask(question, stream=True):
print(ans.content[len(cont):], end='', flush=True)
cont = ans.content

与代理创建会话

Agent.create_session(**kwargs) -> Session

使用当前代理创建一个新会话。

参数

**kwargs

begin组件中的参数。

返回值

  • 成功:包含以下属性的Session对象:
    • id: str 自动生成的唯一标识符,表示创建的会话。
    • message: list[Message] 创建的会话助手的消息。默认为 [{"role": "assistant", "content": "Hi! I am your assistant, can I help you?"}]
    • agent_id: str 关联代理的ID。
  • 失败:抛出异常

示例

from ragflow_sdk import RAGFlow, Agent

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
agent_id = "AGENT_ID"
agent = rag_object.list_agents(id=agent_id)[0]
session = agent.create_session()

与代理对话

Session.ask(question: str="", stream: bool=False) -> Optional[Message, iter[Message]]

向指定的代理提问以启动一次基于AI的对话。

注意

在流模式下,并非所有响应都包含引用,这取决于系统的判断。

参数

question: str

启动基于AI对话的问题。如果Begin组件接受参数,则无需提供问题。

stream: bool

表示是否以流的方式输出响应:

  • True: 启用流(默认)。
  • False: 禁用流。

返回值

  • 如果设置streamFalse,则返回一个包含问题响应的Message对象。
  • 如果设置streamTrue,则返回多个message对象的迭代器(iter[Message])。

以下是Message对象的属性:

id: str

自动生成的消息ID。

content: str

消息的内容。默认值为 "Hi! I am your assistant, can I help you?".

引用: list[Chunk]

表示消息引用的一系列Chunk对象列表,每个对象包含以下属性:

  • id str 占块ID。
  • content str 块的内容。
  • image_id str 块的快照ID。仅当该块来源为图像、PPT、PPTX或PDF文件时有效。
  • document_id str 引用文档的ID。
  • document_name str 引用文档的名称。
  • position list[str] 块在引用文档中的位置信息。
  • dataset_id str 引用文档所属的数据集ID。
  • similarity float 块的综合相似度评分,范围为01,数值越高表示相似性越大。它是向量相似度和术语相似度的加权总和。
  • vector_similarity float 块的向量相似度评分,范围为01,数值越高表示嵌入向量之间的相似性越大。
  • term_similarity float 块的关键字相似度评分,范围为01,数值越高表示关键字之间越相似。

示例

from ragflow_sdk import RAGFlow, Agent

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
AGENT_id = "AGENT_ID"
agent = rag_object.list_agents(id=AGENT_id)[0]
session = agent.create_session()

print("\n===== Miss R ====\n")
print("您好。我能为您做些什么?")

while True:
question = input("\n===== 用户 ====\n> ")
print("\n==== Miss R ====\n")

cont = ""
for ans in session.ask(question, stream=True):
print(ans.content[len(cont):], end='', flush=True)
cont = ans.content

列出代理会话

Agent.list_sessions(
page: int = 1,
page_size: int = 30,
orderby: str = "update_time",
desc: bool = True,
id: str = None
) -> List[Session]

列出与当前代理相关的会话。

参数

page: int

指定显示会话的页面。默认值为1

page_size: int

每页中的会话数量。默认值为30

orderby: str

按照哪个字段排序会话。可用选项:

  • "create_time"
  • "update_time"(默认)
desc: bool

是否按降序排列检索到的会话。默认值为True

id: str

要检索的代理会话ID。默认值为None

返回

  • 成功:与当前代理相关的Session对象列表。
  • 失败:抛出异常。

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
AGENT_id = "AGENT_ID"
agent = rag_object.list_agents(id=AGENT_id)[0]
sessions = agent.list_sessions()
for session in sessions:
print(session)

删除代理的会话

Agent.delete_sessions(ids: list[str] = None)

通过ID删除代理的会话。

参数

ids: list[str]

要删除的会话的ID列表。默认值为None。如果未指定,则将删除与该代理相关联的所有会话。

返回

  • 成功:不返回任何值。
  • 失败:抛出异常。

示例

from ragflow_sdk import RAGFlow

rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
AGENT_id = "AGENT_ID"
agent = rag_object.list_agents(id=AGENT_id)[0]
agent.delete_sessions(ids=["id_1","id_2"])

代理管理


列出代理

RAGFlow.list_agents(
page: int = 1,
page_size: int = 30,
orderby: str = "create_time",
desc: bool = True,
id: str = None,
title: str = None
) -> List[Agent]

列出代理。

参数

page: int

指定显示代理的页面。默认值为1

page_size: int

每页中的代理数量。默认值为30

orderby: str

结果按照哪个属性排序。可用选项:

  • "create_time"(默认)
  • "update_time"
desc: bool

是否按降序排列检索到的代理。默认值为True

id: str

要检索的代理ID。默认值为None

name: str

要检索的代理名称。默认值为None

返回

  • 成功:代理对象列表。
  • 失败:抛出异常。

示例

from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
for agent in rag_object.list_agents():
print(agent)

创建代理

RAGFlow.create_agent(
title: str,
dsl: dict,
description: str | None = None
) -> None

创建一个代理。

参数

标题: str

指定代理的标题。

dsl: dict

指定代理的画布DSL(领域特定语言)。

描述: str

代理的描述。默认为None

返回值

  • 成功: 无。
  • 失败: Exception

示例代码

from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
rag_object.create_agent(
title="Test Agent",
description="A test agent",
dsl={
# ... canvas DSL here ...
}
)

更新代理

RAGFlow.update_agent(
agent_id: str,
title: str | None = None,
description: str | None = None,
dsl: dict | None = None
) -> None

更新一个代理。

参数

agent_id: str

指定要更新的代理ID。

title: str

指定新的代理标题。如果不想更新标题则为None

dsl: dict

指定新的画布DSL(领域特定语言)。如果不想更新DSL则为None

description: str

指定新的代理描述。如果不想更新描述则为None

返回值

  • 成功: 无。
  • 失败: Exception

示例代码

from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
rag_object.update_agent(
agent_id="58af890a2a8911f0a71a11b922ed82d6",
title="Test Agent",
description="A test agent",
dsl={
# ... canvas DSL here ...
}
)

删除代理

RAGFlow.delete_agent(
agent_id: str
) -> None

删除一个代理。

参数

agent_id: str

指定要删除的代理ID。

返回值

  • 成功: 无。
  • 失败: Exception

示例代码

from ragflow_sdk import RAGFlow
rag_object = RAGFlow(api_key="<YOUR_API_KEY>", base_url="http://<YOUR_BASE_URL>:9380")
rag_object.delete_agent("58af890a2a8911f0a71a11b922ed82d6")