企业网站集成AI智能机器人架构设计实战
基于FastAPI+LangChain+RAG构建企业级AI对话机器人,支持多模型编排、流式输出、私域知识库问答与敏感信息过滤
一、项目概述
1.1 业务背景
企业在数字化转型过程中,客服团队面临的核心矛盾是:用户咨询量随业务增长而增加,但人工客服成本高、响应慢、水平参差不齐。更棘手的是,超过 60% 的用户咨询属于重复性问题(产品介绍、价格政策、使用教程),人工处理极度浪费。
我们为集团旗下三个业务网站(官网、电商平台、内部知识库)构建统一的 AI 智能问答系统,核心目标:
- 7×24 小时即时响应:用户提问后平均 3 秒内获得答案
- 私域知识精准问答:基于企业私有文档(产品手册、FAQ、内部规范)作答,不乱编内容
- 多轮对话上下文理解:支持追问、补充条件、澄清意图
- 人工接管无缝衔接:AI 无法回答时自动转人工,保留对话历史
- 对话质量可量化:实时统计回答准确率、转化率、用户满意度
1.2 核心技术指标
二、技术架构设计
2.1 整体架构
系统采用微服务架构,核心分为五层:接入层 → 网关层 → 推理层 → 知识层 → 模型层。
用户请求(网站 Widget / API / 企微)
│
▼
┌─────────────────────────┐
│ 接入层 │ WebSocket / HTTP / 企业微信 SDK
│ (Django Channels) │
└─────────────────────────┘
│
▼
┌─────────────────────────┐
│ API 网关 (Kong) │ 认证、限流、路由、监控
│ 鉴权 / QPS限制(100/min) │
└─────────────────────────┘
│
▼
┌─────────────────────────┐
│ 对话管理服务 │ FastAPI + Uvicorn
│ - 会话管理 │ - 多轮对话上下文窗口管理
│ - 意图分类 │ - 敏感信息检测
│ - 模型编排 │ - 限流控制
└─────────────────────────┘
│
├──────────────────────────────────┐
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ 知识检索服务 │ │ 模型推理服务 │
│ (RAG Pipeline) │ │ (vLLM / OpenAI) │
│ - Embedding │ │ - 流式生成 │
│ - 向量检索 │ │ - Function Call │
│ - 重排序 │ │ - Tool Use │
└───────────────────┘ └───────────────────┘
│
▼
┌─────────────────────────┐
│ 数据层 │
│ Redis (会话缓存) │
│ Milvus (向量数据库) │
│ PostgreSQL (结构化数据) │
│ MinIO (文档存储) │
└─────────────────────────┘
2.2 核心技术选型
| 组件 | 选型 | 选型理由 |
|---|---|---|
| 推理框架 | vLLM + Qwen-72B | PagedAttention显存管理,支持Tensor并行,吞吐量比HF高8倍 |
| 向量数据库 | Milvus 2.4 | 十亿级向量检索,支持混合检索(稀疏+稠密) |
| Embedding | BGE-large-zh | 中文语义能力最强,M3E 免费开源替代 |
| Reranker | Jina-Reranker-v2 | 重排序精度高,API调用简单 |
| 对话框架 | LangChain LCEL | Chain 可组合,LCEL 语法优雅,集成度好 |
| 会话存储 | Redis Cluster | Session Token 管理,TTL 自动过期,水平扩展 |
2.3 会话状态管理
多轮对话需要在有限上下文窗口内维护会话历史。采用滑动窗口策略:
class ConversationManager:
"""
对话上下文管理器:基于滑动窗口的多轮对话管理
"""
MAX_TOKENS = 128000 # Qwen-72B 的上下文窗口
def __init__(self, session_id: str):
self.session_id = session_id
# 从 Redis 加载历史消息
self.messages: List[ChatMessage] = self._load_from_redis()
def add_message(self, role: str, content: str) -> List[ChatMessage]:
self.messages.append(ChatMessage(role=role, content=content))
# 滑动窗口:超出 token 限制时,从最早的消息开始裁剪
while self.estimate_tokens() > self.MAX_TOKENS * 0.85:
removed = self.messages.pop(0) # FIFO,移除最早消息
# 裁剪时保留系统提示词
self.messages.insert(0, self.messages[0])
# 持久化到 Redis
self._save_to_redis()
return self.messages
def estimate_tokens(self) -> int:
# 粗略估算:中文每字约1.5token,英文每词约1.3token
total = 0
for msg in self.messages:
total += len(msg.content) * 1.5
return int(total)
三、RAG知识库构建
3.1 文档处理流水线
企业私域知识包含多种格式:PDF文档(产品手册、合同模板)、Markdown(开发文档)、HTML(帮助中心)、结构化数据(FAQ表格)。需要统一处理后入库:
class DocumentProcessor:
"""
多格式文档处理:PDF / Markdown / HTML / CSV → Chunk → Embedding → Milvus
"""
def process(self, file_path: str) -> List[DocumentChunk]:
file_type = self.detect_type(file_path)
# 1. 内容提取
if file_type == 'pdf':
text = self._extract_pdf(file_path)
elif file_type == 'markdown':
text = self._extract_markdown(file_path)
elif file_type == 'html':
text = self._extract_html(file_path)
elif file_type == 'csv':
text = self._extract_csv(file_path)
# 2. 语义分块(不同于简单截断)
chunks = self._semantic_chunk(text, max_tokens=512, overlap=64)
# 3. 元数据提取
for i, chunk in enumerate(chunks):
chunk.metadata = {
'source': file_path,
'chunk_index': i,
'total_chunks': len(chunks),
'title': self._extract_title(file_path),
'category': self._infer_category(file_path),
}
# 4. 向量化(批处理,减少API调用)
embeddings = self._batch_embed([c.content for c in chunks])
for chunk, embedding in zip(chunks, embeddings):
chunk.embedding = embedding
return chunks
def _semantic_chunk(self, text: str, max_tokens: int, overlap: int) -> List[DocumentChunk]:
"""
语义分块:基于段落边界 + token 限制双约束
不在句子中间截断,保留语义完整性
"""
paragraphs = text.split('\n\n')
chunks = []
current = []
current_tokens = 0
for para in paragraphs:
para_tokens = self._count_tokens(para)
if current_tokens + para_tokens > max_tokens and current:
chunks.append(DocumentChunk(content='\n\n'.join(current)))
# 滑动窗口:保留 overlap 部分
overlap_text = '\n\n'.join(current)[-overlap:] if overlap else ''
current = [overlap_text, para]
current_tokens = self._count_tokens(overlap_text) + para_tokens
else:
current.append(para)
current_tokens += para_tokens
if current:
chunks.append(DocumentChunk(content='\n\n'.join(current)))
return chunks
3.2 混合检索策略
单一向量检索对精确关键词(如产品型号、订单号)召回率低。采用 BM25 稀疏检索 + 向量检索的混合策略:
# 混合检索:稀疏检索(BM25)+ 稠密检索(Embedding)+ 重排序
class HybridRetriever:
def __init__(self):
self.bm25 = BM25Okapi()
self.vector_store = MilvusVectorStore()
self.reranker = JinaReranker()
def retrieve(self, query: str, top_k: int = 20) -> List[ContextChunk]:
# 1. 向量检索(语义相似性)
dense_results = self.vector_store.similarity_search(
query,
k=top_k * 2 # 多取一些给重排序留空间
)
# 2. BM25 检索(关键词精确匹配)
bm25_results = self.bm25.search(query, k=top_k)
# 3. Reciprocal Rank Fusion 融合排序
fused_scores = self._rrf_rank(dense_results, bm25_results, k=60)
# 4. Cross-Encoder 重排序(精度最高)
top_chunks = sorted(fused_scores, key=lambda x: x.score, reverse=True)[:top_k * 2]
reranked = self.reranker.rerank(query, [c.content for c in top_chunks])
return reranked[:top_k]
def _rrf_rank(self, dense, sparse, k: int = 60) -> List[ContextChunk]:
"""
Reciprocal Rank Fusion: 对多个检索结果列表进行融合排序
"""
scores = {}
for i, chunk in enumerate(dense):
scores[chunk.id] = scores.get(chunk.id, 0) + 1 / (k + i + 1)
for i, chunk in enumerate(sparse):
scores[chunk.id] = scores.get(chunk.id, 0) + 1 / (k + i + 1)
# 按融合分数排序
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
return [dense_by_id[id] for id in sorted_ids]
3.3 知识库增量更新
企业文档持续更新,不能每次全量重建。采用增量索引策略:
- 定时同步:每日凌晨2点,对增量修改的文档执行增量 chunking 和 embedding
- 版本管理:每个文档保留最新3个版本,检索时使用最新版本
- 失效机制:文档过期或下线时,通过版本号比对自动剔除旧 chunk
- 监控告警:知识库覆盖率低于80%时告警,提示管理员补充文档
四、流式输出架构
4.1 WebSocket + SSE 双协议
AI 对话要求实时流式输出,用户感知"打字中"而非"等待"。对 Web 端使用 SSE,对小程序/APP 使用 WebSocket,统一底层实现:
# FastAPI 流式端点
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
session_id = request.session_id
async def event_generator():
# 流式生成器
async for token in chat_service.stream_chat(
session_id=session_id,
query=request.message,
user_id=request.user_id,
):
# Server-Sent Events 格式
yield f"data: {json.dumps({'token': token, 'type': 'token'})}\n\n"
# 结束信号
yield f"data: {json.dumps({'type': 'done'})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
},
)
# 底层流式推理(vLLM)
async def stream_chat(self, session_id, query, user_id):
# 1. 构建 RAG 上下文
contexts = self.retriever.retrieve(query, top_k=5)
# 2. 构建 Prompt(Few-shot + RAG Context)
prompt = self._build_prompt(query, contexts)
# 3. 调用 vLLM 流式 API
async for output in self.llm.stream_generate(prompt):
token = output.outputs[0].text
yield token # 每个 token 立即推送
# 4. 记录对话历史
await self.conversation_manager.add_message("user", query)
await self.conversation_manager.add_message("assistant", full_response)
4.2 前端渲染优化
流式输出的体验关键在前端渲染。我们实现了一套打字机效果的平滑渲染:
- 增量渲染:每收到一个 token 就追加显示,不等完整句子
- Markdown 实时解析:收到 Markdown 语法片段后实时渲染(无需等完整内容)
- 防抖动:token 到达速度过快时(>100 token/s)做批量渲染,减少 DOM 更新
- 光标跟随:显示区域自动滚动,光标始终在最后一行
五、多模型编排
5.1 模型分层策略
不同复杂度的问题应该用不同规模的模型,大模型回答简单问题成本高、延迟高:
| 层级 | 模型 | 适用场景 | 延迟 | 成本 |
|---|---|---|---|---|
| L1 | 轻量分类模型 | 意图分类、闲聊 | <50ms | 极低 |
| L2 | Qwen-7B / GPT-3.5 | 简单FAQ、查询类问题 | <1s | 低 |
| L3 | Qwen-72B / GPT-4 | 复杂推理、多跳问答、技术文档 | <5s | 高 |
| L4 | 专家模型 | 代码生成、数学推理 | <3s | 中 |
5.2 意图分类路由
通过轻量级意图分类模型决定路由到哪个层级:
class IntentRouter:
"""
意图分类路由:根据用户问题类型自动选择最优模型
"""
# 意图类别定义
INTENTS = {
'greeting': {'model': 'l1', 'priority': 0}, # 打招呼
'faq': {'model': 'l2', 'priority': 1}, # 常见问答
'product_query': {'model': 'l2', 'priority': 1}, # 产品查询
'technical': {'model': 'l3', 'priority': 2}, # 技术问题
'multi_hop': {'model': 'l3', 'priority': 2}, # 多跳推理
'code': {'model': 'l4', 'priority': 2}, # 代码相关
'unknown': {'model': 'l3', 'priority': 2}, # 无法分类→大模型兜底
}
def classify(self, query: str) -> str:
# L1 快速规则匹配
for pattern, intent in self.rule_patterns.items():
if re.search(pattern, query):
return intent
# L2 向量分类(Embedding + 分类器)
embedding = self.embedding_model.encode(query)
intent = self.classifier.predict(embedding)
return intent
async def route(self, query: str) -> str:
intent = await self.classify(query)
model_level = self.INTENTS[intent]['model']
# 根据模型层级选择对应的推理服务
return await self.inference_router.forward(query, model_level)
5.3 模型降级策略
大模型不可用时自动降级,避免服务中断:
async def chat_with_fallback(self, query: str, session_id: str) -> str:
# 优先使用 L3 大模型
try:
return await self.call_model('l3', query)
except ModelTimeoutError:
# 超时降级:换用 L2 模型
logger.warning(f"L3 model timeout, falling back to L2 for session {session_id}")
try:
return await self.call_model('l2', query)
except ModelUnavailableError:
# 模型完全不可用:返回预设兜底答案
return self.fallback_responses.get(query, FUZZY_ANSWER)
六、敏感信息过滤
6.1 输入敏感信息检测
用户输入中可能包含个人隐私信息(身份证号、手机号、银行卡),这些信息不应进入 AI 模型(防止泄露风险):
class SensitiveInfoDetector:
"""
敏感信息检测:正则 + NER 双层检测
"""
PATTERNS = {
'phone': re.compile(r'1[3-9]\d{9}'), # 手机号
'id_card': re.compile(r'\d{17}[\dXx]'), # 身份证
'bank_card': re.compile(r'\d{16,19}'), # 银行卡
'email': re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+'), # 邮箱
}
def detect(self, text: str) -> Dict[str, List[str]]:
results = {}
for ptype, pattern in self.PATTERNS.items():
matches = pattern.findall(text)
if matches:
results[ptype] = matches
return results
def mask(self, text: str) -> str:
masked = text
for ptype, pattern in self.PATTERNS.items():
if ptype == 'phone':
masked = pattern.sub(lambda m: m.group()[0:3]+'****'+m.group()[-4:], masked)
elif ptype == 'id_card':
masked = pattern.sub(lambda m: m.group()[0:6]+'********'+m.group()[-4:], masked)
return masked
def process_input(self, text: str) -> tuple[str, bool]:
# 检测到敏感信息时,告警+脱敏后放行
sensitive = self.detect(text)
if sensitive:
logger.warning(f"Sensitive info detected: {sensitive}")
# 异步通知安全团队
asyncio.create_task(self.notify_security(sensitive))
return self.mask(text), True # 脱敏后继续
return text, False
6.2 输出安全策略
AI 输出可能包含不当内容,通过多层过滤:
- Prompt 注入检测:检测用户输入中是否包含试图绕过系统提示的指令(如 "忽略之前的指示")
- 输出内容分类:AI 输出经过二分类模型(安全/不安全),不安全内容直接拒绝
- 关键词黑名单:企业敏感词汇(竞争对手名称、未公开产品名)过滤
- 审计日志:所有对话(含脱敏前内容)异步写入审计日志,保留180天
七、部署与性能优化
7.1 vLLM 推理加速
Qwen-72B 的推理速度是传统 HF 框架的 8 倍,关键优化:
- PagedAttention:将 KV Cache 分页管理,显存利用率从 30% 提升至 90%
- Tensor 并行:4卡并行,吞吐量提升3倍
- Continuous Batching:动态批处理,多请求共享计算资源
- FlashAttention-2:注意力计算优化的 CUDA kernel,降低显存占用 50%
7.2 Kubernetes 弹性伸缩
对话量有明显波峰波谷(白天高、夜间低),基于 KEDA 实现 HPA 弹性伸缩:
# KEDA 配置:根据 Kafka 队列深度伸缩推理 Pod
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: llm-worker-scaler
spec:
scaleTargetRef:
name: llm-worker
triggers:
- type: kafka
metadata:
bootstrapServers: kafka-cluster:9092
consumerGroup: llm-worker-group
topic: chat-requests
lagThreshold: "100" # 队列积压超过100条时扩容
minReplicaCount: 2
maxReplicaCount: 20
7.3 缓存加速
高频重复问题(产品介绍、价格政策等)直接返回缓存结果,避免重复推理:
class SemanticCache:
"""
语义缓存:基于向量相似度的问答缓存
缓存命中时,直接返回缓存结果,跳过大模型推理
"""
def __init__(self, redis, embedding_model):
self.redis = redis
self.embedding = embedding_model
async def get(self, query: str, threshold: float = 0.92) -> Optional[str]:
query_vec = await self.embedding.encode(query)
# 在 Redis 中搜索最相似的问题
cached = await self.redis.ft_search('qa_cache', query_vec, limit=1)
if cached and cached[0].score >= threshold:
logger.info(f"Cache hit for query: {query[:30]}...")
return cached[0].answer
return None
async def set(self, query: str, answer: str):
query_vec = await self.embedding.encode(query)
await self.redis.hset('qa_cache', query_vec, answer)
语义缓存命中率约 35%,每天节省约 15,000 次大模型推理调用。
八、成果与总结
系统上线6个月后,AI 机器人累计处理对话 900 万次,日均 5 万次,解答率从上线初期的 75% 提升至 92%。团队将节省的客服人力重新分配至高价值服务,用户整体满意度(CSAT)从 3.2 分提升至 4.6 分(5分制)。