WeKnora与LangChain集成指南打造智能文档处理流水线1. 为什么需要WeKnora与LangChain协同工作在实际的AI应用开发中我们常常面临一个现实问题单一框架很难完美覆盖所有需求。WeKnora作为腾讯开源的文档理解框架擅长处理复杂格式文档、多模态内容解析和企业级知识管理而LangChain则是构建LLM应用的通用框架提供了丰富的链式调用、工具集成和记忆管理能力。两者结合不是简单叠加而是能力互补。想象一下这样的场景你有一份包含图表、表格和文字的PDF技术文档需要从中提取关键参数并生成符合公司规范的技术报告。WeKnora能精准识别文档结构、提取表格数据、理解图表含义LangChain则能将这些结构化信息组织成逻辑清晰的报告并调用外部API补充最新行业标准。这种分工协作比任何单一框架都更接近真实业务需求。很多开发者尝试过直接用LangChain构建文档系统但很快会遇到瓶颈——文档解析质量不稳定、多模态支持弱、企业级功能缺失。而只用WeKnora又可能受限于其特定架构难以快速集成新工具或实现复杂业务逻辑。本文要解决的正是这个最后一公里问题如何让两个强大框架无缝协作而不是各自为战。2. 环境准备与基础对接2.1 WeKnora本地部署验证在开始集成前确保WeKnora已正确部署并运行。根据官方文档推荐使用Docker Compose方式# 克隆项目并进入目录 git clone https://github.com/Tencent/WeKnora.git cd WeKnora # 复制环境配置文件 cp .env.example .env # 启动服务首次运行会下载镜像需要一些时间 ./scripts/start_all.sh启动后通过以下命令验证各服务状态# 查看容器运行情况 docker compose ps # 检查API健康状态 curl http://localhost:8080/health # 应返回 {status:ok} # 测试基本API调用 curl -X POST http://localhost:8080/api/v1/auth/login \ -H Content-Type: application/json \ -d {username:admin,password:admin}如果遇到数据库初始化问题如relation models does not exist执行以下命令# 进入PostgreSQL容器执行初始化脚本 docker exec -i WeKnora-postgres psql -U postgres -d weknora -f /docker-entrypoint-initdb.d/00-init-db.sql2.2 LangChain环境搭建创建独立的Python环境避免依赖冲突# 创建虚拟环境 python -m venv weknora-langchain-env source weknora-langchain-env/bin/activate # Linux/Mac # weknora-langchain-env\Scripts\activate # Windows # 安装核心依赖 pip install langchain langchain-community langchain-core pydantic pip install requests python-dotenv创建基础配置文件.env存储WeKnora API连接信息# .env WEKNORA_API_BASE_URLhttp://localhost:8080 WEKNORA_API_TOKENyour_jwt_token_here WEKNORA_KNOWLEDGE_BASE_IDyour_knowledge_base_id2.3 建立WeKnora API客户端LangChain本身不直接支持WeKnora需要创建自定义客户端。创建weknora_client.pyimport os import json import requests from typing import List, Dict, Optional from pydantic import BaseModel class WeKnoraClient: def __init__(self, base_url: str None, token: str None): self.base_url base_url or os.getenv(WEKNORA_API_BASE_URL, http://localhost:8080) self.token token or os.getenv(WEKNORA_API_TOKEN) self.headers { Authorization: fBearer {self.token}, Content-Type: application/json } def search_knowledge(self, query: str, knowledge_base_id: str None, top_k: int 5) - List[Dict]: 调用WeKnora知识检索API kb_id knowledge_base_id or os.getenv(WEKNORA_KNOWLEDGE_BASE_ID) if not kb_id: raise ValueError(Knowledge base ID must be provided) url f{self.base_url}/api/v1/knowledge-bases/{kb_id}/search payload { query: query, top_k: top_k } try: response requests.post(url, headersself.headers, jsonpayload, timeout30) response.raise_for_status() return response.json().get(results, []) except requests.exceptions.RequestException as e: print(fWeKnora搜索失败: {e}) return [] def create_session(self, knowledge_base_id: str None) - str: 创建对话会话 kb_id knowledge_base_id or os.getenv(WEKNORA_KNOWLEDGE_BASE_ID) url f{self.base_url}/api/v1/sessions payload {knowledge_base_id: kb_id} try: response requests.post(url, headersself.headers, jsonpayload, timeout10) response.raise_for_status() return response.json().get(session_id, ) except requests.exceptions.RequestException as e: print(fWeKnora会话创建失败: {e}) return # 使用示例 if __name__ __main__: client WeKnoraClient() results client.search_knowledge(如何配置Ollama模型) print(f找到 {len(results)} 个相关结果)3. 构建端到端文档处理流水线3.1 文档预处理与向量化流程WeKnora已经完成了文档解析、分块和向量化的核心工作LangChain主要负责在其基础上构建更高层的处理逻辑。创建document_pipeline.pyfrom langchain_core.documents import Document from langchain_core.runnables import RunnablePassthrough from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_community.chat_models import ChatOllama from typing import List, Dict, Any import os class WeKnoraDocumentProcessor: def __init__(self, weknora_client): self.client weknora_client # 使用本地Ollama模型作为LLM后端 self.llm ChatOllama( modelos.getenv(LLM_MODEL_NAME, qwen2.5:7b), base_urlos.getenv(OLLAMA_BASE_URL, http://localhost:11434), temperature0.3 ) def weknora_retriever(self, query: str) - List[Document]: WeKnora检索器返回LangChain兼容的Document对象 results self.client.search_knowledge(query) documents [] for result in results: # 将WeKnora结果转换为LangChain Document doc Document( page_contentresult.get(content, ), metadata{ source: result.get(source, unknown), score: result.get(score, 0.0), chunk_id: result.get(chunk_id, ), knowledge_base_id: result.get(knowledge_base_id, ) } ) documents.append(doc) return documents def create_rag_chain(self): 创建RAG处理链 # 定义提示词模板 template 你是一个专业的技术文档助手。请基于以下提供的上下文信息准确回答用户的问题。 如果上下文信息不足以回答问题请如实说明不要编造答案。 上下文信息 {context} 用户问题{question} 你的回答 prompt ChatPromptTemplate.from_template(template) # 创建链式处理 rag_chain ( {context: self.weknora_retriever | self.format_documents, question: RunnablePassthrough()} | prompt | self.llm | StrOutputParser() ) return rag_chain def format_documents(self, docs) - str: 格式化检索到的文档用于提示词 formatted_docs [] for i, doc in enumerate(docs): formatted_docs.append(f文档 {i1}:\n来源: {doc.metadata.get(source, 未知)}\n内容: {doc.page_content[:500]}...) return \n\n.join(formatted_docs) # 使用示例 if __name__ __main__: from weknora_client import WeKnoraClient client WeKnoraClient() processor WeKnoraDocumentProcessor(client) # 创建RAG链 rag_chain processor.create_rag_chain() # 测试查询 result rag_chain.invoke(WeKnora支持哪些文档格式) print(回答:, result)3.2 多阶段处理流水线设计实际业务中单一RAG链往往不够。我们需要更复杂的流水线来处理不同类型的文档任务。创建advanced_pipeline.pyfrom langchain_core.runnables import RunnableParallel, RunnableLambda from langchain_core.output_parsers import JsonOutputParser from langchain_core.pydantic_v1 import BaseModel, Field from typing import List, Dict, Any class DocumentAnalysis(BaseModel): 文档分析结果结构 key_points: List[str] Field(description文档中的关键要点) technical_terms: List[str] Field(description专业术语列表) action_items: List[str] Field(description需要执行的操作项) summary: str Field(description简洁摘要) class AdvancedDocumentPipeline: def __init__(self, weknora_client): self.client weknora_client self.llm ChatOllama( modelqwen2.5:7b, base_urlhttp://localhost:11434, temperature0.1 ) def create_analysis_chain(self): 创建文档分析链返回结构化结果 parser JsonOutputParser(pydantic_objectDocumentAnalysis) template 你是一个专业的技术文档分析师。请仔细分析以下文档内容 并按照指定JSON格式输出分析结果。确保输出严格符合JSON Schema。 文档内容 {context} 输出格式要求 {format_instructions} 你的分析结果 prompt ChatPromptTemplate.from_template(template).partial( format_instructionsparser.get_format_instructions() ) return ( {context: self._get_document_context} | prompt | self.llm | parser ) def _get_document_context(self, query: str) - str: 获取文档上下文的辅助方法 results self.client.search_knowledge(query, top_k3) context_parts [] for result in results: context_parts.append(f【来源】{result.get(source, 未知)}\n{result.get(content, )}) return \n\n.join(context_parts) def create_multi_step_pipeline(self): 创建多步骤处理流水线 # 步骤1初步检索 retrieval_step RunnableLambda(lambda x: self.client.search_knowledge(x)) # 步骤2关键信息提取 extraction_step RunnableLambda(self._extract_key_info) # 步骤3综合分析 analysis_step self.create_analysis_chain() # 组合流水线 pipeline ( {query: RunnablePassthrough()} | {retrieval: retrieval_step, query: RunnablePassthrough()} | {extraction: extraction_step, query: RunnablePassthrough()} | analysis_step ) return pipeline def _extract_key_info(self, results) - Dict[str, Any]: 从检索结果中提取关键信息 if not results: return {key_points: [], technical_terms: [], action_items: [], summary: } # 简单提取逻辑实际中可替换为更复杂的LLM处理 all_content .join([r.get(content, ) for r in results]) words all_content.split() unique_words list(set(words)) return { key_points: [f关键点{i1} for i in range(min(3, len(results)))], technical_terms: unique_words[:5], action_items: [检查配置, 验证连接, 测试功能], summary: f文档分析完成共找到{len(results)}个相关信息片段 } # 使用示例 if __name__ __main__: from weknora_client import WeKnoraClient client WeKnoraClient() pipeline AdvancedDocumentPipeline(client) # 创建多步骤流水线 multi_step_chain pipeline.create_multi_step_pipeline() # 执行分析 result multi_step_chain.invoke(WeKnora的部署要求是什么) print(结构化分析结果:, result)4. 性能优化与实用技巧4.1 检索性能调优策略WeKnora默认使用混合检索BM25 向量但在LangChain集成中我们可以进一步优化from langchain_core.runnables import RunnableBranch from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate class OptimizedRetrieval: def __init__(self, weknora_client): self.client weknora_client self.llm ChatOllama(modelqwen2.5:7b, base_urlhttp://localhost:11434) def adaptive_retriever(self, query: str) - List[Dict]: 自适应检索器根据查询类型选择最佳检索策略 # 首先判断查询类型 classification_prompt ChatPromptTemplate.from_template( 请分类以下用户查询属于哪种类型 - 技术问题涉及具体技术实现、配置、错误解决等 - 概念解释询问定义、原理、特点等 - 操作步骤需要具体操作指导、流程说明 - 数据查询需要查找具体数值、参数、版本等 查询{query} 只需返回类型名称不要解释。 ) classifier classification_prompt | self.llm | StrOutputParser() query_type classifier.invoke({query: query}).strip().lower() # 根据类型调整检索参数 if 技术问题 in query_type or 操作步骤 in query_type: # 技术类查询更注重精确匹配 return self.client.search_knowledge(query, top_k3) elif 概念解释 in query_type: # 概念类查询需要更广泛的语义匹配 return self.client.search_knowledge(query, top_k5, rerankTrue) else: # 默认使用平衡策略 return self.client.search_knowledge(query, top_k4) def hybrid_search_with_fallback(self, query: str) - str: 混合搜索带降级机制 # 尝试WeKnora主检索 results self.adaptive_retriever(query) if not results: # 降级到简单关键词搜索 print(WeKnora检索无结果启用降级搜索) simple_results self._simple_keyword_search(query) if simple_results: results simple_results # 构建响应 if results: context \n\n.join([f【{r.get(source, 来源)}】{r.get(content, )[:300]} for r in results]) return self._generate_response(query, context) else: return 抱歉未能在知识库中找到相关信息。 def _simple_keyword_search(self, query: str) - List[Dict]: 简单的关键词搜索降级方案 # 实际中可以连接到Elasticsearch或其他搜索引擎 # 这里模拟返回空结果 return [] def _generate_response(self, query: str, context: str) - str: 生成最终响应 prompt ChatPromptTemplate.from_template( 你是一个专业的技术助手。请基于以下上下文信息准确回答用户问题。 如果上下文信息不足请如实说明。 上下文 {context} 用户问题{query} 回答 ) chain prompt | self.llm | StrOutputParser() return chain.invoke({query: query, context: context}) # 使用示例 if __name__ __main__: from weknora_client import WeKnoraClient client WeKnoraClient() optimizer OptimizedRetrieval(client) # 测试不同类型的查询 queries [ 如何配置Ollama模型, WeKnora是什么, 部署WeKnora需要多少内存 ] for q in queries: result optimizer.hybrid_search_with_fallback(q) print(f查询: {q}) print(f响应: {result[:100]}...\n)4.2 错误处理与用户体验增强在生产环境中健壮的错误处理至关重要。创建robust_pipeline.pyimport time import logging from functools import wraps from typing import Callable, Any, Optional from langchain_core.runnables import RunnableConfig # 配置日志 logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) def retry_on_failure(max_retries: int 3, delay: float 1.0): 重试装饰器 def decorator(func: Callable) - Callable: wraps(func) def wrapper(*args, **kwargs): last_exception None for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: last_exception e logger.warning(f第{attempt 1}次尝试失败: {e}) if attempt max_retries - 1: time.sleep(delay * (2 ** attempt)) # 指数退避 raise last_exception return wrapper return decorator class RobustDocumentPipeline: def __init__(self, weknora_client): self.client weknora_client self.llm ChatOllama(modelqwen2.5:7b, base_urlhttp://localhost:11434) retry_on_failure(max_retries2, delay0.5) def safe_search(self, query: str, **kwargs) - List[Dict]: 安全的搜索方法带重试机制 return self.client.search_knowledge(query, **kwargs) def create_resilient_chain(self): 创建具备容错能力的处理链 def resilient_retriever(query: str): try: # 尝试主检索 results self.safe_search(query, top_k5) if results: return results # 尝试模糊搜索 fuzzy_query self._generate_fuzzy_query(query) fuzzy_results self.safe_search(fuzzy_query, top_k3) return fuzzy_results if fuzzy_results else [] except Exception as e: logger.error(f检索过程发生错误: {e}) return [] def fallback_response(query: str) - str: 降级响应生成 return f当前无法访问知识库请稍后再试。您的问题{query} # 创建分支链 chain RunnableBranch( ( lambda x: len(self.safe_search(x, top_k1)) 0, lambda x: self._generate_answer(x, self.safe_search(x, top_k3)) ), fallback_response ) return chain def _generate_fuzzy_query(self, query: str) - str: 生成模糊查询 # 简单的模糊化移除停用词添加同义词 words query.split() if len(words) 2: return .join(words[:-1]) # 移除最后一个词 return query def _generate_answer(self, query: str, results: List[Dict]) - str: 生成答案 if not results: return 未找到相关信息。 context \n\n.join([ f【{r.get(source, 未知)}】{r.get(content, )[:200]} for r in results ]) prompt ChatPromptTemplate.from_template( 请基于以下信息用简洁明了的语言回答问题。 如果信息不完整请说明。 信息 {context} 问题{query} 回答 ) chain prompt | self.llm | StrOutputParser() return chain.invoke({query: query, context: context}) # 使用示例 if __name__ __main__: from weknora_client import WeKnoraClient client WeKnoraClient() robust_pipeline RobustDocumentPipeline(client) # 测试容错能力 chain robust_pipeline.create_resilient_chain() result chain.invoke(WeKnora的安装步骤是什么) print(容错响应:, result)5. 实际应用场景演示5.1 企业内部知识问答系统创建完整的应用入口app.pyfrom fastapi import FastAPI, HTTPException, Depends from pydantic import BaseModel from typing import List, Dict, Any import uvicorn from weknora_client import WeKnoraClient from document_pipeline import WeKnoraDocumentProcessor from advanced_pipeline import AdvancedDocumentPipeline app FastAPI(titleWeKnora-LangChain集成服务) # 全局客户端实例 weknora_client WeKnoraClient() processor WeKnoraDocumentProcessor(weknora_client) advanced_processor AdvancedDocumentPipeline(weknora_client) class QueryRequest(BaseModel): query: str knowledge_base_id: str None mode: str basic # basic, advanced, analysis class QueryResponse(BaseModel): answer: str sources: List[Dict[str, Any]] [] processing_time: float 0.0 app.post(/api/v1/query, response_modelQueryResponse) async def handle_query(request: QueryRequest): 处理用户查询的主接口 import time start_time time.time() try: if request.mode advanced: # 高级模式使用多步骤流水线 chain advanced_processor.create_multi_step_pipeline() result chain.invoke(request.query) answer f分析完成{result.get(summary, 无摘要)} sources [{source: advanced_analysis, content: str(result)}] elif request.mode analysis: # 分析模式返回结构化结果 chain advanced_processor.create_analysis_chain() result chain.invoke(request.query) answer f关键要点{, .join(result.get(key_points, []))} sources [{source: structured_analysis, content: str(result)}] else: # 基础模式标准RAG chain processor.create_rag_chain() answer chain.invoke(request.query) # 获取来源信息 sources weknora_client.search_knowledge(request.query, top_k3) processing_time time.time() - start_time return QueryResponse( answeranswer, sourcessources, processing_timeround(processing_time, 2) ) except Exception as e: raise HTTPException(status_code500, detailf处理失败: {str(e)}) app.get(/api/v1/health) async def health_check(): 健康检查接口 return {status: healthy, weknora_connected: True} if __name__ __main__: uvicorn.run(app, host0.0.0.0, port8000, reloadTrue)5.2 部署与使用说明创建README.md使用指南# WeKnora-LangChain集成项目 ## 快速开始 1. **确保WeKnora已运行** bash cd WeKnora ./scripts/start_all.sh启动LangChain服务# 安装依赖 pip install -r requirements.txt # 启动API服务 python app.py测试API# 基础问答 curl -X POST http://localhost:8000/api/v1/query \ -H Content-Type: application/json \ -d {query:WeKnora支持哪些文档格式} # 高级分析 curl -X POST http://localhost:8000/api/v1/query \ -H Content-Type: application/json \ -d {query:WeKnora的部署要求是什么, mode:advanced}配置说明环境变量默认值说明WEKNORA_API_BASE_URLhttp://localhost:8080WeKnora API地址WEKNORA_API_TOKENadminJWT认证令牌WEKNORA_KNOWLEDGE_BASE_IDyour_kb_id知识库IDOLLAMA_BASE_URLhttp://localhost:11434Ollama服务地址LLM_MODEL_NAMEqwen2.5:7b使用的LLM模型实用技巧调试模式设置LOG_LEVELDEBUG查看详细日志性能监控API响应中包含processing_time字段错误恢复服务自动重试失败的WeKnora请求灵活扩展通过mode参数切换不同处理模式常见问题Q: 如何获取WeKnora的API TokenA: 首次登录WeKnora Web界面后在浏览器开发者工具的Network标签中查找登录请求的响应头。Q: 检索结果为空怎么办A: 检查知识库是否已正确创建并完成文档处理确认文档状态为已完成。Q: 如何添加新的知识库A: 通过WeKnora Web界面创建然后在配置中更新WEKNORA_KNOWLEDGE_BASE_ID。--- **获取更多AI镜像** 想探索更多AI镜像和应用场景访问 [CSDN星图镜像广场](https://ai.csdn.net/?utm_sourcemirror_blog_end)提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。
WeKnora与LangChain集成指南:打造智能文档处理流水线
发布时间:2026/5/27 14:38:18
WeKnora与LangChain集成指南打造智能文档处理流水线1. 为什么需要WeKnora与LangChain协同工作在实际的AI应用开发中我们常常面临一个现实问题单一框架很难完美覆盖所有需求。WeKnora作为腾讯开源的文档理解框架擅长处理复杂格式文档、多模态内容解析和企业级知识管理而LangChain则是构建LLM应用的通用框架提供了丰富的链式调用、工具集成和记忆管理能力。两者结合不是简单叠加而是能力互补。想象一下这样的场景你有一份包含图表、表格和文字的PDF技术文档需要从中提取关键参数并生成符合公司规范的技术报告。WeKnora能精准识别文档结构、提取表格数据、理解图表含义LangChain则能将这些结构化信息组织成逻辑清晰的报告并调用外部API补充最新行业标准。这种分工协作比任何单一框架都更接近真实业务需求。很多开发者尝试过直接用LangChain构建文档系统但很快会遇到瓶颈——文档解析质量不稳定、多模态支持弱、企业级功能缺失。而只用WeKnora又可能受限于其特定架构难以快速集成新工具或实现复杂业务逻辑。本文要解决的正是这个最后一公里问题如何让两个强大框架无缝协作而不是各自为战。2. 环境准备与基础对接2.1 WeKnora本地部署验证在开始集成前确保WeKnora已正确部署并运行。根据官方文档推荐使用Docker Compose方式# 克隆项目并进入目录 git clone https://github.com/Tencent/WeKnora.git cd WeKnora # 复制环境配置文件 cp .env.example .env # 启动服务首次运行会下载镜像需要一些时间 ./scripts/start_all.sh启动后通过以下命令验证各服务状态# 查看容器运行情况 docker compose ps # 检查API健康状态 curl http://localhost:8080/health # 应返回 {status:ok} # 测试基本API调用 curl -X POST http://localhost:8080/api/v1/auth/login \ -H Content-Type: application/json \ -d {username:admin,password:admin}如果遇到数据库初始化问题如relation models does not exist执行以下命令# 进入PostgreSQL容器执行初始化脚本 docker exec -i WeKnora-postgres psql -U postgres -d weknora -f /docker-entrypoint-initdb.d/00-init-db.sql2.2 LangChain环境搭建创建独立的Python环境避免依赖冲突# 创建虚拟环境 python -m venv weknora-langchain-env source weknora-langchain-env/bin/activate # Linux/Mac # weknora-langchain-env\Scripts\activate # Windows # 安装核心依赖 pip install langchain langchain-community langchain-core pydantic pip install requests python-dotenv创建基础配置文件.env存储WeKnora API连接信息# .env WEKNORA_API_BASE_URLhttp://localhost:8080 WEKNORA_API_TOKENyour_jwt_token_here WEKNORA_KNOWLEDGE_BASE_IDyour_knowledge_base_id2.3 建立WeKnora API客户端LangChain本身不直接支持WeKnora需要创建自定义客户端。创建weknora_client.pyimport os import json import requests from typing import List, Dict, Optional from pydantic import BaseModel class WeKnoraClient: def __init__(self, base_url: str None, token: str None): self.base_url base_url or os.getenv(WEKNORA_API_BASE_URL, http://localhost:8080) self.token token or os.getenv(WEKNORA_API_TOKEN) self.headers { Authorization: fBearer {self.token}, Content-Type: application/json } def search_knowledge(self, query: str, knowledge_base_id: str None, top_k: int 5) - List[Dict]: 调用WeKnora知识检索API kb_id knowledge_base_id or os.getenv(WEKNORA_KNOWLEDGE_BASE_ID) if not kb_id: raise ValueError(Knowledge base ID must be provided) url f{self.base_url}/api/v1/knowledge-bases/{kb_id}/search payload { query: query, top_k: top_k } try: response requests.post(url, headersself.headers, jsonpayload, timeout30) response.raise_for_status() return response.json().get(results, []) except requests.exceptions.RequestException as e: print(fWeKnora搜索失败: {e}) return [] def create_session(self, knowledge_base_id: str None) - str: 创建对话会话 kb_id knowledge_base_id or os.getenv(WEKNORA_KNOWLEDGE_BASE_ID) url f{self.base_url}/api/v1/sessions payload {knowledge_base_id: kb_id} try: response requests.post(url, headersself.headers, jsonpayload, timeout10) response.raise_for_status() return response.json().get(session_id, ) except requests.exceptions.RequestException as e: print(fWeKnora会话创建失败: {e}) return # 使用示例 if __name__ __main__: client WeKnoraClient() results client.search_knowledge(如何配置Ollama模型) print(f找到 {len(results)} 个相关结果)3. 构建端到端文档处理流水线3.1 文档预处理与向量化流程WeKnora已经完成了文档解析、分块和向量化的核心工作LangChain主要负责在其基础上构建更高层的处理逻辑。创建document_pipeline.pyfrom langchain_core.documents import Document from langchain_core.runnables import RunnablePassthrough from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_community.chat_models import ChatOllama from typing import List, Dict, Any import os class WeKnoraDocumentProcessor: def __init__(self, weknora_client): self.client weknora_client # 使用本地Ollama模型作为LLM后端 self.llm ChatOllama( modelos.getenv(LLM_MODEL_NAME, qwen2.5:7b), base_urlos.getenv(OLLAMA_BASE_URL, http://localhost:11434), temperature0.3 ) def weknora_retriever(self, query: str) - List[Document]: WeKnora检索器返回LangChain兼容的Document对象 results self.client.search_knowledge(query) documents [] for result in results: # 将WeKnora结果转换为LangChain Document doc Document( page_contentresult.get(content, ), metadata{ source: result.get(source, unknown), score: result.get(score, 0.0), chunk_id: result.get(chunk_id, ), knowledge_base_id: result.get(knowledge_base_id, ) } ) documents.append(doc) return documents def create_rag_chain(self): 创建RAG处理链 # 定义提示词模板 template 你是一个专业的技术文档助手。请基于以下提供的上下文信息准确回答用户的问题。 如果上下文信息不足以回答问题请如实说明不要编造答案。 上下文信息 {context} 用户问题{question} 你的回答 prompt ChatPromptTemplate.from_template(template) # 创建链式处理 rag_chain ( {context: self.weknora_retriever | self.format_documents, question: RunnablePassthrough()} | prompt | self.llm | StrOutputParser() ) return rag_chain def format_documents(self, docs) - str: 格式化检索到的文档用于提示词 formatted_docs [] for i, doc in enumerate(docs): formatted_docs.append(f文档 {i1}:\n来源: {doc.metadata.get(source, 未知)}\n内容: {doc.page_content[:500]}...) return \n\n.join(formatted_docs) # 使用示例 if __name__ __main__: from weknora_client import WeKnoraClient client WeKnoraClient() processor WeKnoraDocumentProcessor(client) # 创建RAG链 rag_chain processor.create_rag_chain() # 测试查询 result rag_chain.invoke(WeKnora支持哪些文档格式) print(回答:, result)3.2 多阶段处理流水线设计实际业务中单一RAG链往往不够。我们需要更复杂的流水线来处理不同类型的文档任务。创建advanced_pipeline.pyfrom langchain_core.runnables import RunnableParallel, RunnableLambda from langchain_core.output_parsers import JsonOutputParser from langchain_core.pydantic_v1 import BaseModel, Field from typing import List, Dict, Any class DocumentAnalysis(BaseModel): 文档分析结果结构 key_points: List[str] Field(description文档中的关键要点) technical_terms: List[str] Field(description专业术语列表) action_items: List[str] Field(description需要执行的操作项) summary: str Field(description简洁摘要) class AdvancedDocumentPipeline: def __init__(self, weknora_client): self.client weknora_client self.llm ChatOllama( modelqwen2.5:7b, base_urlhttp://localhost:11434, temperature0.1 ) def create_analysis_chain(self): 创建文档分析链返回结构化结果 parser JsonOutputParser(pydantic_objectDocumentAnalysis) template 你是一个专业的技术文档分析师。请仔细分析以下文档内容 并按照指定JSON格式输出分析结果。确保输出严格符合JSON Schema。 文档内容 {context} 输出格式要求 {format_instructions} 你的分析结果 prompt ChatPromptTemplate.from_template(template).partial( format_instructionsparser.get_format_instructions() ) return ( {context: self._get_document_context} | prompt | self.llm | parser ) def _get_document_context(self, query: str) - str: 获取文档上下文的辅助方法 results self.client.search_knowledge(query, top_k3) context_parts [] for result in results: context_parts.append(f【来源】{result.get(source, 未知)}\n{result.get(content, )}) return \n\n.join(context_parts) def create_multi_step_pipeline(self): 创建多步骤处理流水线 # 步骤1初步检索 retrieval_step RunnableLambda(lambda x: self.client.search_knowledge(x)) # 步骤2关键信息提取 extraction_step RunnableLambda(self._extract_key_info) # 步骤3综合分析 analysis_step self.create_analysis_chain() # 组合流水线 pipeline ( {query: RunnablePassthrough()} | {retrieval: retrieval_step, query: RunnablePassthrough()} | {extraction: extraction_step, query: RunnablePassthrough()} | analysis_step ) return pipeline def _extract_key_info(self, results) - Dict[str, Any]: 从检索结果中提取关键信息 if not results: return {key_points: [], technical_terms: [], action_items: [], summary: } # 简单提取逻辑实际中可替换为更复杂的LLM处理 all_content .join([r.get(content, ) for r in results]) words all_content.split() unique_words list(set(words)) return { key_points: [f关键点{i1} for i in range(min(3, len(results)))], technical_terms: unique_words[:5], action_items: [检查配置, 验证连接, 测试功能], summary: f文档分析完成共找到{len(results)}个相关信息片段 } # 使用示例 if __name__ __main__: from weknora_client import WeKnoraClient client WeKnoraClient() pipeline AdvancedDocumentPipeline(client) # 创建多步骤流水线 multi_step_chain pipeline.create_multi_step_pipeline() # 执行分析 result multi_step_chain.invoke(WeKnora的部署要求是什么) print(结构化分析结果:, result)4. 性能优化与实用技巧4.1 检索性能调优策略WeKnora默认使用混合检索BM25 向量但在LangChain集成中我们可以进一步优化from langchain_core.runnables import RunnableBranch from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate class OptimizedRetrieval: def __init__(self, weknora_client): self.client weknora_client self.llm ChatOllama(modelqwen2.5:7b, base_urlhttp://localhost:11434) def adaptive_retriever(self, query: str) - List[Dict]: 自适应检索器根据查询类型选择最佳检索策略 # 首先判断查询类型 classification_prompt ChatPromptTemplate.from_template( 请分类以下用户查询属于哪种类型 - 技术问题涉及具体技术实现、配置、错误解决等 - 概念解释询问定义、原理、特点等 - 操作步骤需要具体操作指导、流程说明 - 数据查询需要查找具体数值、参数、版本等 查询{query} 只需返回类型名称不要解释。 ) classifier classification_prompt | self.llm | StrOutputParser() query_type classifier.invoke({query: query}).strip().lower() # 根据类型调整检索参数 if 技术问题 in query_type or 操作步骤 in query_type: # 技术类查询更注重精确匹配 return self.client.search_knowledge(query, top_k3) elif 概念解释 in query_type: # 概念类查询需要更广泛的语义匹配 return self.client.search_knowledge(query, top_k5, rerankTrue) else: # 默认使用平衡策略 return self.client.search_knowledge(query, top_k4) def hybrid_search_with_fallback(self, query: str) - str: 混合搜索带降级机制 # 尝试WeKnora主检索 results self.adaptive_retriever(query) if not results: # 降级到简单关键词搜索 print(WeKnora检索无结果启用降级搜索) simple_results self._simple_keyword_search(query) if simple_results: results simple_results # 构建响应 if results: context \n\n.join([f【{r.get(source, 来源)}】{r.get(content, )[:300]} for r in results]) return self._generate_response(query, context) else: return 抱歉未能在知识库中找到相关信息。 def _simple_keyword_search(self, query: str) - List[Dict]: 简单的关键词搜索降级方案 # 实际中可以连接到Elasticsearch或其他搜索引擎 # 这里模拟返回空结果 return [] def _generate_response(self, query: str, context: str) - str: 生成最终响应 prompt ChatPromptTemplate.from_template( 你是一个专业的技术助手。请基于以下上下文信息准确回答用户问题。 如果上下文信息不足请如实说明。 上下文 {context} 用户问题{query} 回答 ) chain prompt | self.llm | StrOutputParser() return chain.invoke({query: query, context: context}) # 使用示例 if __name__ __main__: from weknora_client import WeKnoraClient client WeKnoraClient() optimizer OptimizedRetrieval(client) # 测试不同类型的查询 queries [ 如何配置Ollama模型, WeKnora是什么, 部署WeKnora需要多少内存 ] for q in queries: result optimizer.hybrid_search_with_fallback(q) print(f查询: {q}) print(f响应: {result[:100]}...\n)4.2 错误处理与用户体验增强在生产环境中健壮的错误处理至关重要。创建robust_pipeline.pyimport time import logging from functools import wraps from typing import Callable, Any, Optional from langchain_core.runnables import RunnableConfig # 配置日志 logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) def retry_on_failure(max_retries: int 3, delay: float 1.0): 重试装饰器 def decorator(func: Callable) - Callable: wraps(func) def wrapper(*args, **kwargs): last_exception None for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: last_exception e logger.warning(f第{attempt 1}次尝试失败: {e}) if attempt max_retries - 1: time.sleep(delay * (2 ** attempt)) # 指数退避 raise last_exception return wrapper return decorator class RobustDocumentPipeline: def __init__(self, weknora_client): self.client weknora_client self.llm ChatOllama(modelqwen2.5:7b, base_urlhttp://localhost:11434) retry_on_failure(max_retries2, delay0.5) def safe_search(self, query: str, **kwargs) - List[Dict]: 安全的搜索方法带重试机制 return self.client.search_knowledge(query, **kwargs) def create_resilient_chain(self): 创建具备容错能力的处理链 def resilient_retriever(query: str): try: # 尝试主检索 results self.safe_search(query, top_k5) if results: return results # 尝试模糊搜索 fuzzy_query self._generate_fuzzy_query(query) fuzzy_results self.safe_search(fuzzy_query, top_k3) return fuzzy_results if fuzzy_results else [] except Exception as e: logger.error(f检索过程发生错误: {e}) return [] def fallback_response(query: str) - str: 降级响应生成 return f当前无法访问知识库请稍后再试。您的问题{query} # 创建分支链 chain RunnableBranch( ( lambda x: len(self.safe_search(x, top_k1)) 0, lambda x: self._generate_answer(x, self.safe_search(x, top_k3)) ), fallback_response ) return chain def _generate_fuzzy_query(self, query: str) - str: 生成模糊查询 # 简单的模糊化移除停用词添加同义词 words query.split() if len(words) 2: return .join(words[:-1]) # 移除最后一个词 return query def _generate_answer(self, query: str, results: List[Dict]) - str: 生成答案 if not results: return 未找到相关信息。 context \n\n.join([ f【{r.get(source, 未知)}】{r.get(content, )[:200]} for r in results ]) prompt ChatPromptTemplate.from_template( 请基于以下信息用简洁明了的语言回答问题。 如果信息不完整请说明。 信息 {context} 问题{query} 回答 ) chain prompt | self.llm | StrOutputParser() return chain.invoke({query: query, context: context}) # 使用示例 if __name__ __main__: from weknora_client import WeKnoraClient client WeKnoraClient() robust_pipeline RobustDocumentPipeline(client) # 测试容错能力 chain robust_pipeline.create_resilient_chain() result chain.invoke(WeKnora的安装步骤是什么) print(容错响应:, result)5. 实际应用场景演示5.1 企业内部知识问答系统创建完整的应用入口app.pyfrom fastapi import FastAPI, HTTPException, Depends from pydantic import BaseModel from typing import List, Dict, Any import uvicorn from weknora_client import WeKnoraClient from document_pipeline import WeKnoraDocumentProcessor from advanced_pipeline import AdvancedDocumentPipeline app FastAPI(titleWeKnora-LangChain集成服务) # 全局客户端实例 weknora_client WeKnoraClient() processor WeKnoraDocumentProcessor(weknora_client) advanced_processor AdvancedDocumentPipeline(weknora_client) class QueryRequest(BaseModel): query: str knowledge_base_id: str None mode: str basic # basic, advanced, analysis class QueryResponse(BaseModel): answer: str sources: List[Dict[str, Any]] [] processing_time: float 0.0 app.post(/api/v1/query, response_modelQueryResponse) async def handle_query(request: QueryRequest): 处理用户查询的主接口 import time start_time time.time() try: if request.mode advanced: # 高级模式使用多步骤流水线 chain advanced_processor.create_multi_step_pipeline() result chain.invoke(request.query) answer f分析完成{result.get(summary, 无摘要)} sources [{source: advanced_analysis, content: str(result)}] elif request.mode analysis: # 分析模式返回结构化结果 chain advanced_processor.create_analysis_chain() result chain.invoke(request.query) answer f关键要点{, .join(result.get(key_points, []))} sources [{source: structured_analysis, content: str(result)}] else: # 基础模式标准RAG chain processor.create_rag_chain() answer chain.invoke(request.query) # 获取来源信息 sources weknora_client.search_knowledge(request.query, top_k3) processing_time time.time() - start_time return QueryResponse( answeranswer, sourcessources, processing_timeround(processing_time, 2) ) except Exception as e: raise HTTPException(status_code500, detailf处理失败: {str(e)}) app.get(/api/v1/health) async def health_check(): 健康检查接口 return {status: healthy, weknora_connected: True} if __name__ __main__: uvicorn.run(app, host0.0.0.0, port8000, reloadTrue)5.2 部署与使用说明创建README.md使用指南# WeKnora-LangChain集成项目 ## 快速开始 1. **确保WeKnora已运行** bash cd WeKnora ./scripts/start_all.sh启动LangChain服务# 安装依赖 pip install -r requirements.txt # 启动API服务 python app.py测试API# 基础问答 curl -X POST http://localhost:8000/api/v1/query \ -H Content-Type: application/json \ -d {query:WeKnora支持哪些文档格式} # 高级分析 curl -X POST http://localhost:8000/api/v1/query \ -H Content-Type: application/json \ -d {query:WeKnora的部署要求是什么, mode:advanced}配置说明环境变量默认值说明WEKNORA_API_BASE_URLhttp://localhost:8080WeKnora API地址WEKNORA_API_TOKENadminJWT认证令牌WEKNORA_KNOWLEDGE_BASE_IDyour_kb_id知识库IDOLLAMA_BASE_URLhttp://localhost:11434Ollama服务地址LLM_MODEL_NAMEqwen2.5:7b使用的LLM模型实用技巧调试模式设置LOG_LEVELDEBUG查看详细日志性能监控API响应中包含processing_time字段错误恢复服务自动重试失败的WeKnora请求灵活扩展通过mode参数切换不同处理模式常见问题Q: 如何获取WeKnora的API TokenA: 首次登录WeKnora Web界面后在浏览器开发者工具的Network标签中查找登录请求的响应头。Q: 检索结果为空怎么办A: 检查知识库是否已正确创建并完成文档处理确认文档状态为已完成。Q: 如何添加新的知识库A: 通过WeKnora Web界面创建然后在配置中更新WEKNORA_KNOWLEDGE_BASE_ID。--- **获取更多AI镜像** 想探索更多AI镜像和应用场景访问 [CSDN星图镜像广场](https://ai.csdn.net/?utm_sourcemirror_blog_end)提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。