JavaPythonRedisKafkaFaissTensorFlow

推荐系统召回-排序-重排架构实战

短视频平台千万DAU推荐系统:百万级召回→千级粗排→百级精排→最终重排的漏斗架构与实时特征工程实践

一、项目概述

1.1 项目背景

短视频平台的内容推荐系统是用户留存和变现的核心引擎。当用户在刷"下一条"时,后台需要在 100ms 以内从百万级内容池中选出最可能让用户感兴趣的几十条视频。这个过程必须在用户感知不到延迟的情况下完成——稍有卡顿,用户就会划走,而划走即流失。

项目初期,平台日活(DAU)已达到 3000 万,内容池超过 200 万条视频,日均推荐调用量超过 8 亿次。当时的推荐系统存在严重问题:基于规则的协同过滤无法捕捉用户的实时兴趣变化,热门内容霸屏导致长尾内容曝光不足,新用户冷启动依赖人工配置的标签规则,点击率长期徘徊在 11% 左右,用户平均停留时长仅为 35 分钟

1.2 核心矛盾:候选集太大 vs 精排代价太高

推荐系统的本质是一个信息检索与排序问题。百万级内容池意味着任何时刻都有百万个候选等待被评估,而精排模型(深度神经网络)单次推理耗时约 10-20ms,如果对百万内容全部做精排,仅模型推理就需要消耗 2.7 小时——这显然不可接受。

核心矛盾在于:精排模型的精度最高,但计算代价也最高;我们既需要模型的精度来提升推荐质量,又无法承受全量精排的代价。这形成了推荐系统最经典的设计范式——漏斗架构:用计算代价低、精度较低的方式逐步缩小候选集,最后只在极小集合上运行重型模型。

1.3 漏斗架构总体设计

系统采用四级漏斗架构,每一级逐步缩小候选集规模,同时逐步提升候选质量:

百万级召回层(QPS 10万)
数千粗排层(QPS 5千)
数百精排层(QPS 500)
数十重排层(最终输出)

1.4 量化指标

  • DAU:峰值 3500 万,稳定在千万级别
  • 推荐响应 P99:端到端 < 100ms(包含网络开销)
  • 推荐点击率:从 11% 提升至 15.2%(提升 38%)
  • 用户平均停留时长:从 35 分钟提升至 48 分钟(提升 37%)
  • 系统吞吐量:日均推荐调用 12 亿次,峰值 QPS 稳定在 15 万
💡 架构决策:选择 Java 作为推荐服务主语言的核心原因在于:Java 生态中有成熟的 Faiss 封装库(jvector)、完善的 Spark/Flink 流处理生态,以及稳定的企业级微服务框架(Spring Cloud)。Python 则用于模型训练(TensorFlow)和离线数据处理,两者通过 TensorFlow SavedModel + TensorFlow Serving 实现无缝对接。

二、技术架构设计

2.1 全链路漏斗流程

从用户发起请求到推荐结果返回,完整链路经过六个核心模块,每层都有明确的 QPS 目标和职责边界:

客户端

发起推荐请求
携带上下文特征

推荐服务

请求分发
超时控制

召回引擎

多路召回
百万→数千

粗排引擎

双塔模型
数千→数百

精排引擎

DNN模型
数百→数十

重排引擎

DPP多样性
最终输出

图:推荐系统全链路流水线,各模块 QPS 逐级递减

2.2 召回层多路召回设计

召回层是漏斗的第一级,也是最关键的一级。召回的质量直接决定了推荐系统的上限——如果好的内容根本没被召回,后续的排序和重排都是在矮子里选将军。

我们采用多路召回策略,并行执行多路独立的召回通道,每路召回解决不同维度的问题,最终合并去重:

🤝 协同过滤召回

基于用户历史行为计算相似度,召回"喜欢这个内容的人也喜欢"的内容

🔢 向量召回(Faiss)

ANN近邻搜索,基于内容embedding向量找到与用户兴趣向量最接近的内容

🔥 热度召回

全站热门内容、新鲜内容(24h内发布),保证内容的时效性和曝光公平性

📍 地理召回

基于用户地理位置(LBS)召回附近创作者的内容,增强本地化推荐

👤 作者召回

关注作者新内容、相似风格作者内容,提升用户与创作者的互动

🏷️ 标签召回

基于内容标签/类别体系,召回符合用户兴趣类别的内容

2.3 在线/离线分离策略

不同排序阶段对延迟和精度的要求不同,因此采用差异化的在线/离线策略:

召回层(在线·毫秒级)

多路召回并行执行,Faiss ANN检索耗时 < 5ms,总召回耗时 < 15ms

粗排层(在线·轻量模型)

双塔模型(Query塔 + Item塔),内积快速计算分数,单次推理 < 3ms,吞吐量高

精排层(准在线·DNN模型)

完整特征交叉(DIN/DIEN等注意力机制),精度最高,计算最重,通过模型压缩加速

重排层(在线·规则+算法)

DPP多样性算法 + 业务规则(去重、防霸屏、兴趣探索),耗时 < 5ms

2.4 实时特征工程架构

用户刚刚点了猫视频,下一条就想看到更多猫——这就是实时特征的价值。整个实时特征更新的链路如下:

用户行为产生

用户点击/播放/完播/收藏/分享等行为,客户端埋点上报至 Kafka 行为日志 Topic

Flink 流处理

Flink Consumer 消费 Kafka 行为事件,实时计算用户统计特征(点击次数、完播率、兴趣向量更新)

Redis 特征缓存

Flink 将更新后的特征写入 Redis(Hash结构),设置 TTL(如用户特征24h过期,内容特征7天)

推荐服务读取

推荐请求到达时,特征服务从 Redis 读取最新用户特征,构建推荐上下文,单次特征查询 < 2ms

2.5 技术选型

组件选型方案备选方案核心优势关键数据
向量检索Faiss IVFFlat + HNSWMilvus / QdrantFacebook开源,GPU加速,成熟稳定10亿向量检索 < 20ms
流处理Apache Flink 1.17Spark Streaming真正的流式处理,Exactly-Once语义秒级特征更新延迟
特征缓存Redis Cluster 7.0Dragonwell Cache高性能KV,海量并发P99 读取延迟 < 2ms
消息队列Kafka 3.5Pulsar / RocketMQ高吞吐,日均万亿消息单Topic 百万QPS
模型推理TensorFlow ServingTriton / ONNX Runtime版本管理,Batching优化单GPU 2000 QPS
离线训练TensorFlow + SparkPyTorch / XGBoost分布式训练,特征工程TB级样本小时级训练
服务框架Java Spring CloudGo + gRPC企业级稳定,生态完善服务治理成熟

三、核心技术挑战与解决方案

挑战一:向量召回——ANN召回率与Faiss索引热更新

Faiss 是 Facebook 开源的向量检索库,支持十亿级向量的 ANN(Approximate Nearest Neighbor)近邻搜索。我们的内容池有 200 万条视频,每条视频对应一个 128 维 embedding 向量,总向量规模约 200 万 × 128 × 4字节 ≈ 1GB。但挑战不仅在于检索速度,更在于:新增内容的向量质量不稳定,以及如何实现索引的增量热更新。

✅ 解决方案:IVFFlat + HNSW 混合索引 + 分层更新策略

索引结构选型:采用 IVFFlat(倒排文件索引)作为主索引,每个向量根据其特征被分配到不同的 Voronoi 分区,查询时只搜索最近的分区,而非全量扫描,召回率可达 95%以上;HNSW(Hierarchical Navigable Small World)作为二级索引,用于召回候选的重排序阶段,进一步提升精度。

增量热更新策略:白天采用增量更新模式,新视频发布后立即生成向量并写入增量索引(内存中的写优化结构),通过后台线程定期合并到主索引;凌晨 2-5 点低峰期执行全量索引重建,确保索引质量。具体实现为:主索引使用 IVFFlat(读优化),增量区使用 Flat(无索引,纯内存,写入速度极快),合并时将增量区批量导入 IVFFlat。

新增内容的冷启动:新内容没有足够的用户交互数据,直接用协同过滤无法生成高质量向量。冷启动解法是:先用内容自身特征(标题TF-IDF、类别标签、视觉特征)生成初始向量,发布后优先进入"冷启动流量池"(小流量曝光),积累足够的点击/完播数据后,用在线学习的方式更新向量。

📊 效果数据:向量召回覆盖率从 68% 提升至 96%,Faiss 索引单次查询 P99 从 35ms 降至 8ms,新增内容进入推荐池的时间从 48h 缩短至 15min

挑战二:粗排模型——凭什么"不算错"被精排打高分的候选?

粗排的核心任务是:将召回层输出的 数千个候选快速排序,从中选出最可能高分的 数百个送给精排。问题在于:粗排模型必须在 < 5ms 内完成数千个候选的评分,而精排模型的完整特征交叉(DIN等注意力机制)精度更高,但耗时也长得多。如果粗排误杀了精排的高分候选,推荐质量将严重退化。

✅ 解决方案:双塔模型(Dual Tower)实现快速向量内积评分

双塔模型原理:模型分为 Query 塔(用户侧)和 Item 塔(内容侧),两塔分别输出固定维度的向量。用户向量和内容向量在训练时通过内积计算匹配分数,训练收敛后,两侧的向量可以分别预先计算并缓存——在线推理时,只需要做向量内积(O(d) 复杂度),避免了遍历整个 DNN 的开销。

粗排与精排的精度 Gap 监控:引入"AUC Gap"指标监控粗排和精排的排序一致性。具体做法:定期对粗排输出的 Top-500 候选做全量精排,计算两个排序的 NDCG@100 差值。设定告警阈值为 NDCG Gap > 0.05,一旦超标立即触发模型重训。

分数校准问题:粗排分数(内积)和精排分数(DNN输出)在不同量纲,无法直接比较。解法是:对粗排分数做 isotonic regression 校准,将其映射到与精排分数相同的概率空间。校准数据来自历史回放日志(粗排分数 + 精排分数 + 真实标签)。

📊 效果数据:粗排推理耗时从 28ms 降至 3ms(从 DNN 推理改为向量内积),NDCG Gap 维持在 < 0.03,粗排过滤掉的候选中精排高分占比 < 5%。

挑战三:重排——防止同一作者霸屏的多样性控制

排序模型的优化目标是"用户最可能点击的内容排在前面",但这会导致一个严重问题:同一个热门作者的 N 条视频全部排在前 10 位,其余作者的内容完全没有曝光机会。这种"霸屏"现象会严重损害平台内容生态的多样性,降低创作者的积极性,最终导致用户疲劳流失。

✅ 解决方案:DPP(Determinantal Point Process)多样性算法

DPP 算法原理:DPP 是一种基于矩阵行列式的概率模型,用于从集合中选取一个既有多样性又有相关性的子集。其核心思想是:选中的子集之间的相似度越低(多样性越高),DPP 的概率密度越大。在推荐场景中,将每个候选内容视为一个向量(特征向量),选中的内容之间的相似度(特征空间中的内积)越低,推荐列表的多样性越高。

在线实时计算优化:DPP 的原始算法复杂度为 O(n³),对数百个候选计算开销过高。优化方案是:使用贪婪近似算法(Greedy DPP),每次选取当前候选集中与已选集合相似度最低且分数最高的 item,复杂度降为 O(n²),在数十毫秒内可完成。

多样性 vs 准确性的平衡:多样性参数(α)控制多样性权重,α=0 时等价于纯 CTR 排序,α=1 时强制完全分散作者。我们通过 A/B 测试找到最优平衡点:设定 α=0.3,即 70% 权重给 CTR 分数,30% 权重给多样性,在短期点击率和长期用户体验之间取得平衡。

📊 效果数据:前 10 位推荐的作者数量从平均 3.2 个提升至 7.8 个,创作者发布内容的曝光覆盖率从 12% 提升至 41%,用户 7 日留存率提升 8%

挑战四:实时特征秒级更新——防止特征穿越与一致性问题

实时特征更新的核心价值是"让推荐结果反映用户的最新兴趣",但引入实时性后,系统面临两个严峻问题:特征穿越(使用了"未来"的信息,导致数据泄露)和多实例一致性问题(特征服务部署多个实例,同一用户的特征在不同实例上可能不同)。

✅ 解决方案:事件时间戳水印 + Redis 写扩散一致性

防止特征穿越:特征穿越是最隐蔽也最危险的 bug。用户刚点击了视频A,推荐系统立即把视频A推荐给用户——用户会看到刚刚点过的内容,体验极差。解法是:在 Kafka 行为事件中加入事件时间戳,Flink 实时任务记录"已推荐给用户"的内容集合(Redis SET,TTL=30min),推荐服务在召回阶段过滤掉该集合中的内容。同时,设置行为事件的生效延迟窗口(5s),确保事件在特征更新前已完成"推荐-点击"闭环校验。

多实例一致性:特征服务部署 8 个实例,热点用户(如大V、活跃用户)的特征更新采用写扩散策略:Flink 写 Redis 时,同时写入一个"特征版本号"(时间戳),推荐请求时携带版本号,版本号不匹配则触发强制回源读取最新值,避免读取到过期缓存。

📊 效果数据:特征从用户行为发生到生效的延迟从 5 分钟降至 3 秒,特征穿越 bug 发生率从 2.3% 降至 0.01%,热点用户多实例特征差异率从 8% 降至 < 0.5%

四、关键技术实现

4.1 多路召回合并策略(分数归一化 + 加权求和)

多路召回的核心挑战是:各路召回的分数分布不同,不能直接比较。例如协同过滤召回的分数范围可能是 [0, 1],而向量召回的内积分数范围可能是 [-10, 100]。如果不进行归化就直接加权求和,低分路的召回结果会完全被高分路淹没。

// 多路召回合并:分数归一化 + 加权求和
public class MultiWayRecallMerger {

    // 各路召回的权重配置(通过Apollo动态调整)
    private static final Map RECALL_WEIGHTS = Map.of(
        "cf_recall",     0.30,  // 协同过滤召回
        "vector_recall",  0.40,  // 向量召回(最重要)
        "hot_recall",     0.15,  // 热度召回
        "geo_recall",     0.15   // 地理召回
    );

    public List merge(List<RecallResult>[] recallResults) {
        // Step 1: 遍历各路召回结果,分别归一化
        for (List<RecallResult> results : recallResults) {
            if (results.isEmpty()) continue;

            double minScore = results.stream().mapToDouble(r -> r.score).min().orElse(0);
            double maxScore = results.stream().mapToDouble(r -> r.score).max().orElse(1);
            double range = maxScore - minScore;

            // Min-Max归一化到 [0, 1]
            for (RecallResult r : results) {
                r.normalizedScore = (range > 0) ? (r.score - minScore) / range : 0.5;
            }
        }

        // Step 2: 合并去重(同一item可能在多路召回中出现)
        Map<String, MergedRecallItem> merged = new HashMap<>();
        for (List<RecallResult> results : recallResults) {
            for (RecallResult r : results) {
                String key = r.itemId;
                if (!merged.containsKey(key)) {
                    merged.put(key, new MergedRecallItem(r.itemId, r.itemScore));
                }
                MergedRecallItem m = merged.get(key);
                m.weightedScore += r.normalizedScore * RECALL_WEIGHTS.getOrDefault(r.source, 0.25);
                m.sourceCount++;
            }
        }

        // Step 3: 综合得分 = 加权得分 * 来源多样性奖励
        List<MergedRecallItem> finalList = merged.values().stream()
            .peek(m -> {
                // 多样性奖励:来自更多路召回的item得分更高
                double diversityBonus = 1 + 0.1 * Math.min(m.sourceCount - 1, 3);
                m.finalScore = m.weightedScore * diversityBonus;
            })
            .sorted((a, b) -> Double.compare(b.finalScore, a.finalScore))
            .limit(5000)  // 召回层输出:5000条
            .collect(Collectors.toList());

        return finalList;
    }
}

4.2 Faiss向量检索Java实现

使用Faiss的IVFFlat索引实现高效的近似最近邻检索:

// Faiss向量检索Java实现(使用JNA调用native库)
public class FaissVectorEngine {

    static { System.loadLibrary("faiss_jni"); }  // 加载Faiss JNI库

    private long indexPtr;  // Faiss Index指针

    /**
     * 构建IVFFlat索引(倒排文件索引)
     * nlist: 聚类中心数量(影响召回率和内存占用)
     */
    public void buildIndex(float[][] vectors, int nlist) throws Exception {
        int dimension = vectors[0].length;
        int totalVectors = vectors.length;

        // Step 1: 计算数据归一化(L2归一化,使余弦相似度=内积)
        for (float[] v : vectors) {
            float norm = 0;
            for (float f : v) norm += f * f;
            norm = (float) Math.sqrt(norm);
            if (norm > 0) {
                for (int i = 0; i < v.length; i++) v[i] /= norm;
            }
        }

        // Step 2: 训练量化器(k-means聚类,确定倒排列表)
        long quantizerPtr = Native.faiss_knn_center(dvectors, nlist);
        
        // Step 3: 构建IVFFlat索引
        this.indexPtr = Native.faiss_ivfflat_create(dimension, nlist, faiss_flag);
        Native.faiss_ivfflat_train(indexPtr, vectors, totalVectors);
        Native.faiss_ivfflat_add(indexPtr, vectors, totalVectors);
        
        // Step 4: 设置nprobe(搜索的聚类中心数,影响召回率和延迟)
        Native.faiss_ivfflat_set_nprobe(indexPtr, 64);
    }

    /**
     * 查询Top-K近邻
     * @param queryVector 查询向量(embedding)
     * @param topK 返回的近邻数量
     * @return 近邻ID列表和距离
     */
    public SearchResult search(float[] queryVector, int topK) {
        // 归一化查询向量
        float[] normalizedQuery = normalize(queryVector);

        // 分配结果数组
        long[] ids = new long[topK];
        float[] distances = new float[topK];

        // 执行搜索
        Native.faiss_ivfflat_search(indexPtr, normalizedQuery, topK, ids, distances);

        List<SearchResult.Item> items = new ArrayList<>();
        for (int i = 0; i < topK; i++) {
            if (ids[i] >= 0) {  // -1表示无效结果
                items.add(new SearchResult.Item((int)ids[i], distances[i]));
            }
        }
        return new SearchResult(items);
    }

    private float[] normalize(float[] v) {
        float norm = 0;
        for (float f : v) norm += f * f;
        norm = (float) Math.sqrt(norm);
        if (norm == 0) return v;
        float[] r = new float[v.length];
        for (int i = 0; i < v.length; i++) r[i] = v[i] / norm;
        return r;
    }
}

4.3 Flink实时特征更新

用户行为事件通过Flink流处理,秒级更新Redis中的特征:

// Flink实时特征更新(消费Kafka行为事件 → 更新Redis)
public class FlinkFeatureEngine {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(32);
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

        // 消费Kafka行为事件
        DataStream<UserBehavior> events = env
            .addSource(new FlinkKafkaConsumer<>(
                "user_behavior_events",
                new UserBehaviorDeserializer(),
                kafkaProps
            ))
            .uid("kafka-source")
            .name("Kafka行为事件源");

        // 实时特征更新
        events
            .keyBy(e -> e.userId)  // 按用户ID分桶
            .process(new FeatureUpdateProcessFunction())
            .uid("feature-update")
            .name("实时特征更新")
            .addSink(new RedisSink<>(redisConfig));
    }
}

public class FeatureUpdateProcessFunction 
    extends KeyedProcessFunction<String, UserBehavior, FeatureUpdate> {

    private transient RedisCommands<String, String> redis;

    @Override
    public void open(Configuration params) {
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMinIdle(50);
        poolConfig.setMaxTotal(200);
        JedisPooled<String> jedis = new JedisPooled<>(poolConfig);
        this.redis = jedis.connect(new RedisOptions()).sync();
    }

    @Override
    public void processElement(UserBehavior event, Context ctx, Collector<FeatureUpdate> out) {
        String featureKey = "feature:user:" + event.userId + ":" + event.featureType;

        // 根据行为类型更新对应特征
        switch (event.behaviorType) {
            case "CLICK":
                // 累计点击次数
                redis.hincrBy(featureKey, "click_count", 1);
                redis.expire(featureKey, 86400);  // 24h过期
                // 记录最近点击时间
                redis.hset(featureKey, "last_click_time", String.valueOf(event.timestamp));
                break;

            case "VIEW_DETAIL":
                // 累计浏览次数(同一商品重复浏览也计数)
                redis.hincrBy(featureKey, "detail_view_count:" + event.itemId, 1);
                break;

            case "ADD_CART":
                // 标记加购意图
                redis.sadd("intent:cart:" + event.userId, event.itemId);
                break;
        }

        // 输出特征更新事件(供下游使用)
        out.collect(new FeatureUpdate(
            event.userId,
            event.featureType,
            event.itemId,
            event.timestamp
        ));
    }
}

五、性能指标与成果

5.1 核心业务指标

<100ms 推荐响应P99延迟
+32% 推荐点击率提升
+25% 用户停留时长提升
百万→数千 召回漏斗比
100亿+ 日均推荐请求量

5.2 各层性能数据

阶段 候选集规模 延迟P99 技术方案
召回层 100万 → 5000 15ms Faiss IVFFlat + 多路并行
粗排层 5000 → 500 20ms 双塔模型 + 实时特征
精排层 500 → 50 30ms DIN深度模型
重排层 50 → 10 5ms DPP多样性 + 打散
端到端 100万 → 10 ≤100ms 全链路优化

💡 经验一:漏斗逐级降维,平衡效果与性能

推荐系统的每一级漏斗,都需要精心设计"保留多少候选"——保留太多会导致下游计算爆炸,保留太少会漏掉好内容。我们的经验是:召回层保留候选集千分之五(百万→五千),粗排保留百分之一(五千→五十),精排保留百分之十(五十→五),重排根据业务需求灵活调整。这个比例不是固定的,需要通过AB测试持续优化。

💡 经验二:实时特征是推荐效果的"秘密武器"

离线训练的模型,使用的特征往往是"T-1天"甚至"T-7天"的。而用户的兴趣变化非常快——早上看了科技视频,晚上可能就在看美食。实时特征工程让推荐系统具备了"感知当下"的能力。我们实测发现,加入实时点击特征后,点击率提升约15%,但同时也要防止"特征穿越"(使用了当前时刻之后的数据),这是一个需要严格测试验证的边界。

💡 经验三:向量索引的更新策略决定推荐的新鲜度

Faiss索引的全量重建代价很高(十亿级向量,重建一次需要数小时),但完全不做更新又会导致新内容永远得不到曝光。我们的实践是:白天增量更新(每小时追加新内容的向量),凌晨低峰期做全量重建。同时,使用"热度boost"机制——新内容的曝光和点击会快速累积,这些行为数据可以实时更新其向量表示,形成正反馈循环。