Python FastAPI LangChain RAG WebSocket

企业网站集成AI智能机器人架构设计实战

基于FastAPI+LangChain+RAG构建企业级AI对话机器人,支持多模型编排、流式输出、私域知识库问答与敏感信息过滤

一、项目概述

1.1 业务背景

企业在数字化转型过程中,客服团队面临的核心矛盾是:用户咨询量随业务增长而增加,但人工客服成本高、响应慢、水平参差不齐。更棘手的是,超过 60% 的用户咨询属于重复性问题(产品介绍、价格政策、使用教程),人工处理极度浪费。

我们为集团旗下三个业务网站(官网、电商平台、内部知识库)构建统一的 AI 智能问答系统,核心目标:

  • 7×24 小时即时响应:用户提问后平均 3 秒内获得答案
  • 私域知识精准问答:基于企业私有文档(产品手册、FAQ、内部规范)作答,不乱编内容
  • 多轮对话上下文理解:支持追问、补充条件、澄清意图
  • 人工接管无缝衔接:AI 无法回答时自动转人工,保留对话历史
  • 对话质量可量化:实时统计回答准确率、转化率、用户满意度

1.2 核心技术指标

3秒 首Token响应时间
92.3% 回答准确率
60% 人工客服工作量降低
5万+ 日均对话量

二、技术架构设计

2.1 整体架构

系统采用微服务架构,核心分为五层:接入层 → 网关层 → 推理层 → 知识层 → 模型层。

用户请求(网站 Widget / API / 企微)
  │
  ▼
┌─────────────────────────┐
│  接入层                  │  WebSocket / HTTP / 企业微信 SDK
│  (Django Channels)      │
└─────────────────────────┘
  │
  ▼
┌─────────────────────────┐
│  API 网关 (Kong)        │  认证、限流、路由、监控
│  鉴权 / QPS限制(100/min) │
└─────────────────────────┘
  │
  ▼
┌─────────────────────────┐
│  对话管理服务            │  FastAPI + Uvicorn
│  - 会话管理              │  - 多轮对话上下文窗口管理
│  - 意图分类              │  - 敏感信息检测
│  - 模型编排              │  - 限流控制
└─────────────────────────┘
  │
  ├──────────────────────────────────┐
  ▼                                   ▼
┌───────────────────┐         ┌───────────────────┐
│  知识检索服务       │         │  模型推理服务       │
│  (RAG Pipeline)    │         │  (vLLM / OpenAI)  │
│  - Embedding       │         │  - 流式生成        │
│  - 向量检索         │         │  - Function Call  │
│  - 重排序           │         │  - Tool Use       │
└───────────────────┘         └───────────────────┘
  │
  ▼
┌─────────────────────────┐
│  数据层                 │
│  Redis (会话缓存)        │
│  Milvus (向量数据库)     │
│  PostgreSQL (结构化数据) │
│  MinIO (文档存储)        │
└─────────────────────────┘

2.2 核心技术选型

组件选型选型理由
推理框架vLLM + Qwen-72BPagedAttention显存管理,支持Tensor并行,吞吐量比HF高8倍
向量数据库Milvus 2.4十亿级向量检索,支持混合检索(稀疏+稠密)
EmbeddingBGE-large-zh中文语义能力最强,M3E 免费开源替代
RerankerJina-Reranker-v2重排序精度高,API调用简单
对话框架LangChain LCELChain 可组合,LCEL 语法优雅,集成度好
会话存储Redis ClusterSession Token 管理,TTL 自动过期,水平扩展

2.3 会话状态管理

多轮对话需要在有限上下文窗口内维护会话历史。采用滑动窗口策略:

class ConversationManager:
    """
    对话上下文管理器:基于滑动窗口的多轮对话管理
    """
    MAX_TOKENS = 128000  # Qwen-72B 的上下文窗口

    def __init__(self, session_id: str):
        self.session_id = session_id
        # 从 Redis 加载历史消息
        self.messages: List[ChatMessage] = self._load_from_redis()

    def add_message(self, role: str, content: str) -> List[ChatMessage]:
        self.messages.append(ChatMessage(role=role, content=content))
        # 滑动窗口:超出 token 限制时,从最早的消息开始裁剪
        while self.estimate_tokens() > self.MAX_TOKENS * 0.85:
            removed = self.messages.pop(0)  # FIFO,移除最早消息
            # 裁剪时保留系统提示词
            self.messages.insert(0, self.messages[0])
        # 持久化到 Redis
        self._save_to_redis()
        return self.messages

    def estimate_tokens(self) -> int:
        # 粗略估算:中文每字约1.5token,英文每词约1.3token
        total = 0
        for msg in self.messages:
            total += len(msg.content) * 1.5
        return int(total)

三、RAG知识库构建

3.1 文档处理流水线

企业私域知识包含多种格式:PDF文档(产品手册、合同模板)、Markdown(开发文档)、HTML(帮助中心)、结构化数据(FAQ表格)。需要统一处理后入库:

class DocumentProcessor:
    """
    多格式文档处理:PDF / Markdown / HTML / CSV → Chunk → Embedding → Milvus
    """
    def process(self, file_path: str) -> List[DocumentChunk]:
        file_type = self.detect_type(file_path)

        # 1. 内容提取
        if file_type == 'pdf':
            text = self._extract_pdf(file_path)
        elif file_type == 'markdown':
            text = self._extract_markdown(file_path)
        elif file_type == 'html':
            text = self._extract_html(file_path)
        elif file_type == 'csv':
            text = self._extract_csv(file_path)

        # 2. 语义分块(不同于简单截断)
        chunks = self._semantic_chunk(text, max_tokens=512, overlap=64)

        # 3. 元数据提取
        for i, chunk in enumerate(chunks):
            chunk.metadata = {
                'source': file_path,
                'chunk_index': i,
                'total_chunks': len(chunks),
                'title': self._extract_title(file_path),
                'category': self._infer_category(file_path),
            }

        # 4. 向量化(批处理,减少API调用)
        embeddings = self._batch_embed([c.content for c in chunks])

        for chunk, embedding in zip(chunks, embeddings):
            chunk.embedding = embedding

        return chunks

    def _semantic_chunk(self, text: str, max_tokens: int, overlap: int) -> List[DocumentChunk]:
        """
        语义分块:基于段落边界 + token 限制双约束
        不在句子中间截断,保留语义完整性
        """
        paragraphs = text.split('\n\n')
        chunks = []
        current = []
        current_tokens = 0

        for para in paragraphs:
            para_tokens = self._count_tokens(para)
            if current_tokens + para_tokens > max_tokens and current:
                chunks.append(DocumentChunk(content='\n\n'.join(current)))
                # 滑动窗口:保留 overlap 部分
                overlap_text = '\n\n'.join(current)[-overlap:] if overlap else ''
                current = [overlap_text, para]
                current_tokens = self._count_tokens(overlap_text) + para_tokens
            else:
                current.append(para)
                current_tokens += para_tokens

        if current:
            chunks.append(DocumentChunk(content='\n\n'.join(current)))
        return chunks

3.2 混合检索策略

单一向量检索对精确关键词(如产品型号、订单号)召回率低。采用 BM25 稀疏检索 + 向量检索的混合策略:

# 混合检索:稀疏检索(BM25)+ 稠密检索(Embedding)+ 重排序
class HybridRetriever:
    def __init__(self):
        self.bm25 = BM25Okapi()
        self.vector_store = MilvusVectorStore()
        self.reranker = JinaReranker()

    def retrieve(self, query: str, top_k: int = 20) -> List[ContextChunk]:
        # 1. 向量检索(语义相似性)
        dense_results = self.vector_store.similarity_search(
            query,
            k=top_k * 2  # 多取一些给重排序留空间
        )

        # 2. BM25 检索(关键词精确匹配)
        bm25_results = self.bm25.search(query, k=top_k)

        # 3. Reciprocal Rank Fusion 融合排序
        fused_scores = self._rrf_rank(dense_results, bm25_results, k=60)

        # 4. Cross-Encoder 重排序(精度最高)
        top_chunks = sorted(fused_scores, key=lambda x: x.score, reverse=True)[:top_k * 2]
        reranked = self.reranker.rerank(query, [c.content for c in top_chunks])

        return reranked[:top_k]

    def _rrf_rank(self, dense, sparse, k: int = 60) -> List[ContextChunk]:
        """
        Reciprocal Rank Fusion: 对多个检索结果列表进行融合排序
        """
        scores = {}
        for i, chunk in enumerate(dense):
            scores[chunk.id] = scores.get(chunk.id, 0) + 1 / (k + i + 1)
        for i, chunk in enumerate(sparse):
            scores[chunk.id] = scores.get(chunk.id, 0) + 1 / (k + i + 1)
        # 按融合分数排序
        sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
        return [dense_by_id[id] for id in sorted_ids]

3.3 知识库增量更新

企业文档持续更新,不能每次全量重建。采用增量索引策略:

  • 定时同步:每日凌晨2点,对增量修改的文档执行增量 chunking 和 embedding
  • 版本管理:每个文档保留最新3个版本,检索时使用最新版本
  • 失效机制:文档过期或下线时,通过版本号比对自动剔除旧 chunk
  • 监控告警:知识库覆盖率低于80%时告警,提示管理员补充文档

四、流式输出架构

4.1 WebSocket + SSE 双协议

AI 对话要求实时流式输出,用户感知"打字中"而非"等待"。对 Web 端使用 SSE,对小程序/APP 使用 WebSocket,统一底层实现:

# FastAPI 流式端点
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
    session_id = request.session_id

    async def event_generator():
        # 流式生成器
        async for token in chat_service.stream_chat(
            session_id=session_id,
            query=request.message,
            user_id=request.user_id,
        ):
            # Server-Sent Events 格式
            yield f"data: {json.dumps({'token': token, 'type': 'token'})}\n\n"

        # 结束信号
        yield f"data: {json.dumps({'type': 'done'})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # 禁用 Nginx 缓冲
        },
    )

# 底层流式推理(vLLM)
async def stream_chat(self, session_id, query, user_id):
    # 1. 构建 RAG 上下文
    contexts = self.retriever.retrieve(query, top_k=5)

    # 2. 构建 Prompt(Few-shot + RAG Context)
    prompt = self._build_prompt(query, contexts)

    # 3. 调用 vLLM 流式 API
    async for output in self.llm.stream_generate(prompt):
        token = output.outputs[0].text
        yield token  # 每个 token 立即推送

    # 4. 记录对话历史
    await self.conversation_manager.add_message("user", query)
    await self.conversation_manager.add_message("assistant", full_response)

4.2 前端渲染优化

流式输出的体验关键在前端渲染。我们实现了一套打字机效果的平滑渲染:

  • 增量渲染:每收到一个 token 就追加显示,不等完整句子
  • Markdown 实时解析:收到 Markdown 语法片段后实时渲染(无需等完整内容)
  • 防抖动:token 到达速度过快时(>100 token/s)做批量渲染,减少 DOM 更新
  • 光标跟随:显示区域自动滚动,光标始终在最后一行

五、多模型编排

5.1 模型分层策略

不同复杂度的问题应该用不同规模的模型,大模型回答简单问题成本高、延迟高:

层级模型适用场景延迟成本
L1轻量分类模型意图分类、闲聊<50ms极低
L2Qwen-7B / GPT-3.5简单FAQ、查询类问题<1s
L3Qwen-72B / GPT-4复杂推理、多跳问答、技术文档<5s
L4专家模型代码生成、数学推理<3s

5.2 意图分类路由

通过轻量级意图分类模型决定路由到哪个层级:

class IntentRouter:
    """
    意图分类路由:根据用户问题类型自动选择最优模型
    """
    # 意图类别定义
    INTENTS = {
        'greeting': {'model': 'l1', 'priority': 0},      # 打招呼
        'faq': {'model': 'l2', 'priority': 1},            # 常见问答
        'product_query': {'model': 'l2', 'priority': 1}, # 产品查询
        'technical': {'model': 'l3', 'priority': 2},     # 技术问题
        'multi_hop': {'model': 'l3', 'priority': 2},     # 多跳推理
        'code': {'model': 'l4', 'priority': 2},          # 代码相关
        'unknown': {'model': 'l3', 'priority': 2},      # 无法分类→大模型兜底
    }

    def classify(self, query: str) -> str:
        # L1 快速规则匹配
        for pattern, intent in self.rule_patterns.items():
            if re.search(pattern, query):
                return intent

        # L2 向量分类(Embedding + 分类器)
        embedding = self.embedding_model.encode(query)
        intent = self.classifier.predict(embedding)
        return intent

    async def route(self, query: str) -> str:
        intent = await self.classify(query)
        model_level = self.INTENTS[intent]['model']
        # 根据模型层级选择对应的推理服务
        return await self.inference_router.forward(query, model_level)

5.3 模型降级策略

大模型不可用时自动降级,避免服务中断:

async def chat_with_fallback(self, query: str, session_id: str) -> str:
    # 优先使用 L3 大模型
    try:
        return await self.call_model('l3', query)
    except ModelTimeoutError:
        # 超时降级:换用 L2 模型
        logger.warning(f"L3 model timeout, falling back to L2 for session {session_id}")
        try:
            return await self.call_model('l2', query)
        except ModelUnavailableError:
            # 模型完全不可用:返回预设兜底答案
            return self.fallback_responses.get(query, FUZZY_ANSWER)

六、敏感信息过滤

6.1 输入敏感信息检测

用户输入中可能包含个人隐私信息(身份证号、手机号、银行卡),这些信息不应进入 AI 模型(防止泄露风险):

class SensitiveInfoDetector:
    """
    敏感信息检测:正则 + NER 双层检测
    """
    PATTERNS = {
        'phone': re.compile(r'1[3-9]\d{9}'),                        # 手机号
        'id_card': re.compile(r'\d{17}[\dXx]'),                     # 身份证
        'bank_card': re.compile(r'\d{16,19}'),                     # 银行卡
        'email': re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+'), # 邮箱
    }

    def detect(self, text: str) -> Dict[str, List[str]]:
        results = {}
        for ptype, pattern in self.PATTERNS.items():
            matches = pattern.findall(text)
            if matches:
                results[ptype] = matches
        return results

    def mask(self, text: str) -> str:
        masked = text
        for ptype, pattern in self.PATTERNS.items():
            if ptype == 'phone':
                masked = pattern.sub(lambda m: m.group()[0:3]+'****'+m.group()[-4:], masked)
            elif ptype == 'id_card':
                masked = pattern.sub(lambda m: m.group()[0:6]+'********'+m.group()[-4:], masked)
        return masked

    def process_input(self, text: str) -> tuple[str, bool]:
        # 检测到敏感信息时,告警+脱敏后放行
        sensitive = self.detect(text)
        if sensitive:
            logger.warning(f"Sensitive info detected: {sensitive}")
            # 异步通知安全团队
            asyncio.create_task(self.notify_security(sensitive))
            return self.mask(text), True  # 脱敏后继续
        return text, False

6.2 输出安全策略

AI 输出可能包含不当内容,通过多层过滤:

  • Prompt 注入检测:检测用户输入中是否包含试图绕过系统提示的指令(如 "忽略之前的指示")
  • 输出内容分类:AI 输出经过二分类模型(安全/不安全),不安全内容直接拒绝
  • 关键词黑名单:企业敏感词汇(竞争对手名称、未公开产品名)过滤
  • 审计日志:所有对话(含脱敏前内容)异步写入审计日志,保留180天

七、部署与性能优化

7.1 vLLM 推理加速

Qwen-72B 的推理速度是传统 HF 框架的 8 倍,关键优化:

  • PagedAttention:将 KV Cache 分页管理,显存利用率从 30% 提升至 90%
  • Tensor 并行:4卡并行,吞吐量提升3倍
  • Continuous Batching:动态批处理,多请求共享计算资源
  • FlashAttention-2:注意力计算优化的 CUDA kernel,降低显存占用 50%

7.2 Kubernetes 弹性伸缩

对话量有明显波峰波谷(白天高、夜间低),基于 KEDA 实现 HPA 弹性伸缩:

# KEDA 配置:根据 Kafka 队列深度伸缩推理 Pod
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: llm-worker-scaler
spec:
  scaleTargetRef:
    name: llm-worker
  triggers:
    - type: kafka
      metadata:
        bootstrapServers: kafka-cluster:9092
        consumerGroup: llm-worker-group
        topic: chat-requests
        lagThreshold: "100"  # 队列积压超过100条时扩容
  minReplicaCount: 2
  maxReplicaCount: 20

7.3 缓存加速

高频重复问题(产品介绍、价格政策等)直接返回缓存结果,避免重复推理:

class SemanticCache:
    """
    语义缓存:基于向量相似度的问答缓存
    缓存命中时,直接返回缓存结果,跳过大模型推理
    """
    def __init__(self, redis, embedding_model):
        self.redis = redis
        self.embedding = embedding_model

    async def get(self, query: str, threshold: float = 0.92) -> Optional[str]:
        query_vec = await self.embedding.encode(query)
        # 在 Redis 中搜索最相似的问题
        cached = await self.redis.ft_search('qa_cache', query_vec, limit=1)
        if cached and cached[0].score >= threshold:
            logger.info(f"Cache hit for query: {query[:30]}...")
            return cached[0].answer
        return None

    async def set(self, query: str, answer: str):
        query_vec = await self.embedding.encode(query)
        await self.redis.hset('qa_cache', query_vec, answer)

语义缓存命中率约 35%,每天节省约 15,000 次大模型推理调用。

八、成果与总结

92.3% 回答准确率(人工评估)
60% 人工客服工作量降低
3秒 首Token平均响应时间
35% 语义缓存命中率

系统上线6个月后,AI 机器人累计处理对话 900 万次,日均 5 万次,解答率从上线初期的 75% 提升至 92%。团队将节省的客服人力重新分配至高价值服务,用户整体满意度(CSAT)从 3.2 分提升至 4.6 分(5分制)。