推荐系统召回-排序-重排架构实战
短视频平台千万DAU推荐系统:百万级召回→千级粗排→百级精排→最终重排的漏斗架构与实时特征工程实践
一、项目概述
1.1 项目背景
短视频平台的内容推荐系统是用户留存和变现的核心引擎。当用户在刷"下一条"时,后台需要在 100ms 以内从百万级内容池中选出最可能让用户感兴趣的几十条视频。这个过程必须在用户感知不到延迟的情况下完成——稍有卡顿,用户就会划走,而划走即流失。
项目初期,平台日活(DAU)已达到 3000 万,内容池超过 200 万条视频,日均推荐调用量超过 8 亿次。当时的推荐系统存在严重问题:基于规则的协同过滤无法捕捉用户的实时兴趣变化,热门内容霸屏导致长尾内容曝光不足,新用户冷启动依赖人工配置的标签规则,点击率长期徘徊在 11% 左右,用户平均停留时长仅为 35 分钟。
1.2 核心矛盾:候选集太大 vs 精排代价太高
推荐系统的本质是一个信息检索与排序问题。百万级内容池意味着任何时刻都有百万个候选等待被评估,而精排模型(深度神经网络)单次推理耗时约 10-20ms,如果对百万内容全部做精排,仅模型推理就需要消耗 2.7 小时——这显然不可接受。
核心矛盾在于:精排模型的精度最高,但计算代价也最高;我们既需要模型的精度来提升推荐质量,又无法承受全量精排的代价。这形成了推荐系统最经典的设计范式——漏斗架构:用计算代价低、精度较低的方式逐步缩小候选集,最后只在极小集合上运行重型模型。
1.3 漏斗架构总体设计
系统采用四级漏斗架构,每一级逐步缩小候选集规模,同时逐步提升候选质量:
1.4 量化指标
- DAU:峰值 3500 万,稳定在千万级别
- 推荐响应 P99:端到端 < 100ms(包含网络开销)
- 推荐点击率:从 11% 提升至 15.2%(提升 38%)
- 用户平均停留时长:从 35 分钟提升至 48 分钟(提升 37%)
- 系统吞吐量:日均推荐调用 12 亿次,峰值 QPS 稳定在 15 万
二、技术架构设计
2.1 全链路漏斗流程
从用户发起请求到推荐结果返回,完整链路经过六个核心模块,每层都有明确的 QPS 目标和职责边界:
发起推荐请求
携带上下文特征
请求分发
超时控制
多路召回
百万→数千
双塔模型
数千→数百
DNN模型
数百→数十
DPP多样性
最终输出
图:推荐系统全链路流水线,各模块 QPS 逐级递减
2.2 召回层多路召回设计
召回层是漏斗的第一级,也是最关键的一级。召回的质量直接决定了推荐系统的上限——如果好的内容根本没被召回,后续的排序和重排都是在矮子里选将军。
我们采用多路召回策略,并行执行多路独立的召回通道,每路召回解决不同维度的问题,最终合并去重:
🤝 协同过滤召回
基于用户历史行为计算相似度,召回"喜欢这个内容的人也喜欢"的内容
🔢 向量召回(Faiss)
ANN近邻搜索,基于内容embedding向量找到与用户兴趣向量最接近的内容
🔥 热度召回
全站热门内容、新鲜内容(24h内发布),保证内容的时效性和曝光公平性
📍 地理召回
基于用户地理位置(LBS)召回附近创作者的内容,增强本地化推荐
👤 作者召回
关注作者新内容、相似风格作者内容,提升用户与创作者的互动
🏷️ 标签召回
基于内容标签/类别体系,召回符合用户兴趣类别的内容
2.3 在线/离线分离策略
不同排序阶段对延迟和精度的要求不同,因此采用差异化的在线/离线策略:
多路召回并行执行,Faiss ANN检索耗时 < 5ms,总召回耗时 < 15ms
双塔模型(Query塔 + Item塔),内积快速计算分数,单次推理 < 3ms,吞吐量高
完整特征交叉(DIN/DIEN等注意力机制),精度最高,计算最重,通过模型压缩加速
DPP多样性算法 + 业务规则(去重、防霸屏、兴趣探索),耗时 < 5ms
2.4 实时特征工程架构
用户刚刚点了猫视频,下一条就想看到更多猫——这就是实时特征的价值。整个实时特征更新的链路如下:
用户点击/播放/完播/收藏/分享等行为,客户端埋点上报至 Kafka 行为日志 Topic
Flink Consumer 消费 Kafka 行为事件,实时计算用户统计特征(点击次数、完播率、兴趣向量更新)
Flink 将更新后的特征写入 Redis(Hash结构),设置 TTL(如用户特征24h过期,内容特征7天)
推荐请求到达时,特征服务从 Redis 读取最新用户特征,构建推荐上下文,单次特征查询 < 2ms
2.5 技术选型
| 组件 | 选型方案 | 备选方案 | 核心优势 | 关键数据 |
|---|---|---|---|---|
| 向量检索 | Faiss IVFFlat + HNSW | Milvus / Qdrant | Facebook开源,GPU加速,成熟稳定 | 10亿向量检索 < 20ms |
| 流处理 | Apache Flink 1.17 | Spark Streaming | 真正的流式处理,Exactly-Once语义 | 秒级特征更新延迟 |
| 特征缓存 | Redis Cluster 7.0 | Dragonwell Cache | 高性能KV,海量并发 | P99 读取延迟 < 2ms |
| 消息队列 | Kafka 3.5 | Pulsar / RocketMQ | 高吞吐,日均万亿消息 | 单Topic 百万QPS |
| 模型推理 | TensorFlow Serving | Triton / ONNX Runtime | 版本管理,Batching优化 | 单GPU 2000 QPS |
| 离线训练 | TensorFlow + Spark | PyTorch / XGBoost | 分布式训练,特征工程 | TB级样本小时级训练 |
| 服务框架 | Java Spring Cloud | Go + gRPC | 企业级稳定,生态完善 | 服务治理成熟 |
三、核心技术挑战与解决方案
挑战一:向量召回——ANN召回率与Faiss索引热更新
Faiss 是 Facebook 开源的向量检索库,支持十亿级向量的 ANN(Approximate Nearest Neighbor)近邻搜索。我们的内容池有 200 万条视频,每条视频对应一个 128 维 embedding 向量,总向量规模约 200 万 × 128 × 4字节 ≈ 1GB。但挑战不仅在于检索速度,更在于:新增内容的向量质量不稳定,以及如何实现索引的增量热更新。
✅ 解决方案:IVFFlat + HNSW 混合索引 + 分层更新策略
索引结构选型:采用 IVFFlat(倒排文件索引)作为主索引,每个向量根据其特征被分配到不同的 Voronoi 分区,查询时只搜索最近的分区,而非全量扫描,召回率可达 95%以上;HNSW(Hierarchical Navigable Small World)作为二级索引,用于召回候选的重排序阶段,进一步提升精度。
增量热更新策略:白天采用增量更新模式,新视频发布后立即生成向量并写入增量索引(内存中的写优化结构),通过后台线程定期合并到主索引;凌晨 2-5 点低峰期执行全量索引重建,确保索引质量。具体实现为:主索引使用 IVFFlat(读优化),增量区使用 Flat(无索引,纯内存,写入速度极快),合并时将增量区批量导入 IVFFlat。
新增内容的冷启动:新内容没有足够的用户交互数据,直接用协同过滤无法生成高质量向量。冷启动解法是:先用内容自身特征(标题TF-IDF、类别标签、视觉特征)生成初始向量,发布后优先进入"冷启动流量池"(小流量曝光),积累足够的点击/完播数据后,用在线学习的方式更新向量。
挑战二:粗排模型——凭什么"不算错"被精排打高分的候选?
粗排的核心任务是:将召回层输出的 数千个候选快速排序,从中选出最可能高分的 数百个送给精排。问题在于:粗排模型必须在 < 5ms 内完成数千个候选的评分,而精排模型的完整特征交叉(DIN等注意力机制)精度更高,但耗时也长得多。如果粗排误杀了精排的高分候选,推荐质量将严重退化。
✅ 解决方案:双塔模型(Dual Tower)实现快速向量内积评分
双塔模型原理:模型分为 Query 塔(用户侧)和 Item 塔(内容侧),两塔分别输出固定维度的向量。用户向量和内容向量在训练时通过内积计算匹配分数,训练收敛后,两侧的向量可以分别预先计算并缓存——在线推理时,只需要做向量内积(O(d) 复杂度),避免了遍历整个 DNN 的开销。
粗排与精排的精度 Gap 监控:引入"AUC Gap"指标监控粗排和精排的排序一致性。具体做法:定期对粗排输出的 Top-500 候选做全量精排,计算两个排序的 NDCG@100 差值。设定告警阈值为 NDCG Gap > 0.05,一旦超标立即触发模型重训。
分数校准问题:粗排分数(内积)和精排分数(DNN输出)在不同量纲,无法直接比较。解法是:对粗排分数做 isotonic regression 校准,将其映射到与精排分数相同的概率空间。校准数据来自历史回放日志(粗排分数 + 精排分数 + 真实标签)。
挑战三:重排——防止同一作者霸屏的多样性控制
排序模型的优化目标是"用户最可能点击的内容排在前面",但这会导致一个严重问题:同一个热门作者的 N 条视频全部排在前 10 位,其余作者的内容完全没有曝光机会。这种"霸屏"现象会严重损害平台内容生态的多样性,降低创作者的积极性,最终导致用户疲劳流失。
✅ 解决方案:DPP(Determinantal Point Process)多样性算法
DPP 算法原理:DPP 是一种基于矩阵行列式的概率模型,用于从集合中选取一个既有多样性又有相关性的子集。其核心思想是:选中的子集之间的相似度越低(多样性越高),DPP 的概率密度越大。在推荐场景中,将每个候选内容视为一个向量(特征向量),选中的内容之间的相似度(特征空间中的内积)越低,推荐列表的多样性越高。
在线实时计算优化:DPP 的原始算法复杂度为 O(n³),对数百个候选计算开销过高。优化方案是:使用贪婪近似算法(Greedy DPP),每次选取当前候选集中与已选集合相似度最低且分数最高的 item,复杂度降为 O(n²),在数十毫秒内可完成。
多样性 vs 准确性的平衡:多样性参数(α)控制多样性权重,α=0 时等价于纯 CTR 排序,α=1 时强制完全分散作者。我们通过 A/B 测试找到最优平衡点:设定 α=0.3,即 70% 权重给 CTR 分数,30% 权重给多样性,在短期点击率和长期用户体验之间取得平衡。
挑战四:实时特征秒级更新——防止特征穿越与一致性问题
实时特征更新的核心价值是"让推荐结果反映用户的最新兴趣",但引入实时性后,系统面临两个严峻问题:特征穿越(使用了"未来"的信息,导致数据泄露)和多实例一致性问题(特征服务部署多个实例,同一用户的特征在不同实例上可能不同)。
✅ 解决方案:事件时间戳水印 + Redis 写扩散一致性
防止特征穿越:特征穿越是最隐蔽也最危险的 bug。用户刚点击了视频A,推荐系统立即把视频A推荐给用户——用户会看到刚刚点过的内容,体验极差。解法是:在 Kafka 行为事件中加入事件时间戳,Flink 实时任务记录"已推荐给用户"的内容集合(Redis SET,TTL=30min),推荐服务在召回阶段过滤掉该集合中的内容。同时,设置行为事件的生效延迟窗口(5s),确保事件在特征更新前已完成"推荐-点击"闭环校验。
多实例一致性:特征服务部署 8 个实例,热点用户(如大V、活跃用户)的特征更新采用写扩散策略:Flink 写 Redis 时,同时写入一个"特征版本号"(时间戳),推荐请求时携带版本号,版本号不匹配则触发强制回源读取最新值,避免读取到过期缓存。
四、关键技术实现
4.1 多路召回合并策略(分数归一化 + 加权求和)
多路召回的核心挑战是:各路召回的分数分布不同,不能直接比较。例如协同过滤召回的分数范围可能是 [0, 1],而向量召回的内积分数范围可能是 [-10, 100]。如果不进行归化就直接加权求和,低分路的召回结果会完全被高分路淹没。
// 多路召回合并:分数归一化 + 加权求和
public class MultiWayRecallMerger {
// 各路召回的权重配置(通过Apollo动态调整)
private static final Map RECALL_WEIGHTS = Map.of(
"cf_recall", 0.30, // 协同过滤召回
"vector_recall", 0.40, // 向量召回(最重要)
"hot_recall", 0.15, // 热度召回
"geo_recall", 0.15 // 地理召回
);
public List merge(List<RecallResult>[] recallResults) {
// Step 1: 遍历各路召回结果,分别归一化
for (List<RecallResult> results : recallResults) {
if (results.isEmpty()) continue;
double minScore = results.stream().mapToDouble(r -> r.score).min().orElse(0);
double maxScore = results.stream().mapToDouble(r -> r.score).max().orElse(1);
double range = maxScore - minScore;
// Min-Max归一化到 [0, 1]
for (RecallResult r : results) {
r.normalizedScore = (range > 0) ? (r.score - minScore) / range : 0.5;
}
}
// Step 2: 合并去重(同一item可能在多路召回中出现)
Map<String, MergedRecallItem> merged = new HashMap<>();
for (List<RecallResult> results : recallResults) {
for (RecallResult r : results) {
String key = r.itemId;
if (!merged.containsKey(key)) {
merged.put(key, new MergedRecallItem(r.itemId, r.itemScore));
}
MergedRecallItem m = merged.get(key);
m.weightedScore += r.normalizedScore * RECALL_WEIGHTS.getOrDefault(r.source, 0.25);
m.sourceCount++;
}
}
// Step 3: 综合得分 = 加权得分 * 来源多样性奖励
List<MergedRecallItem> finalList = merged.values().stream()
.peek(m -> {
// 多样性奖励:来自更多路召回的item得分更高
double diversityBonus = 1 + 0.1 * Math.min(m.sourceCount - 1, 3);
m.finalScore = m.weightedScore * diversityBonus;
})
.sorted((a, b) -> Double.compare(b.finalScore, a.finalScore))
.limit(5000) // 召回层输出:5000条
.collect(Collectors.toList());
return finalList;
}
}
4.2 Faiss向量检索Java实现
使用Faiss的IVFFlat索引实现高效的近似最近邻检索:
// Faiss向量检索Java实现(使用JNA调用native库)
public class FaissVectorEngine {
static { System.loadLibrary("faiss_jni"); } // 加载Faiss JNI库
private long indexPtr; // Faiss Index指针
/**
* 构建IVFFlat索引(倒排文件索引)
* nlist: 聚类中心数量(影响召回率和内存占用)
*/
public void buildIndex(float[][] vectors, int nlist) throws Exception {
int dimension = vectors[0].length;
int totalVectors = vectors.length;
// Step 1: 计算数据归一化(L2归一化,使余弦相似度=内积)
for (float[] v : vectors) {
float norm = 0;
for (float f : v) norm += f * f;
norm = (float) Math.sqrt(norm);
if (norm > 0) {
for (int i = 0; i < v.length; i++) v[i] /= norm;
}
}
// Step 2: 训练量化器(k-means聚类,确定倒排列表)
long quantizerPtr = Native.faiss_knn_center(dvectors, nlist);
// Step 3: 构建IVFFlat索引
this.indexPtr = Native.faiss_ivfflat_create(dimension, nlist, faiss_flag);
Native.faiss_ivfflat_train(indexPtr, vectors, totalVectors);
Native.faiss_ivfflat_add(indexPtr, vectors, totalVectors);
// Step 4: 设置nprobe(搜索的聚类中心数,影响召回率和延迟)
Native.faiss_ivfflat_set_nprobe(indexPtr, 64);
}
/**
* 查询Top-K近邻
* @param queryVector 查询向量(embedding)
* @param topK 返回的近邻数量
* @return 近邻ID列表和距离
*/
public SearchResult search(float[] queryVector, int topK) {
// 归一化查询向量
float[] normalizedQuery = normalize(queryVector);
// 分配结果数组
long[] ids = new long[topK];
float[] distances = new float[topK];
// 执行搜索
Native.faiss_ivfflat_search(indexPtr, normalizedQuery, topK, ids, distances);
List<SearchResult.Item> items = new ArrayList<>();
for (int i = 0; i < topK; i++) {
if (ids[i] >= 0) { // -1表示无效结果
items.add(new SearchResult.Item((int)ids[i], distances[i]));
}
}
return new SearchResult(items);
}
private float[] normalize(float[] v) {
float norm = 0;
for (float f : v) norm += f * f;
norm = (float) Math.sqrt(norm);
if (norm == 0) return v;
float[] r = new float[v.length];
for (int i = 0; i < v.length; i++) r[i] = v[i] / norm;
return r;
}
}
4.3 Flink实时特征更新
用户行为事件通过Flink流处理,秒级更新Redis中的特征:
// Flink实时特征更新(消费Kafka行为事件 → 更新Redis)
public class FlinkFeatureEngine {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(32);
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// 消费Kafka行为事件
DataStream<UserBehavior> events = env
.addSource(new FlinkKafkaConsumer<>(
"user_behavior_events",
new UserBehaviorDeserializer(),
kafkaProps
))
.uid("kafka-source")
.name("Kafka行为事件源");
// 实时特征更新
events
.keyBy(e -> e.userId) // 按用户ID分桶
.process(new FeatureUpdateProcessFunction())
.uid("feature-update")
.name("实时特征更新")
.addSink(new RedisSink<>(redisConfig));
}
}
public class FeatureUpdateProcessFunction
extends KeyedProcessFunction<String, UserBehavior, FeatureUpdate> {
private transient RedisCommands<String, String> redis;
@Override
public void open(Configuration params) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMinIdle(50);
poolConfig.setMaxTotal(200);
JedisPooled<String> jedis = new JedisPooled<>(poolConfig);
this.redis = jedis.connect(new RedisOptions()).sync();
}
@Override
public void processElement(UserBehavior event, Context ctx, Collector<FeatureUpdate> out) {
String featureKey = "feature:user:" + event.userId + ":" + event.featureType;
// 根据行为类型更新对应特征
switch (event.behaviorType) {
case "CLICK":
// 累计点击次数
redis.hincrBy(featureKey, "click_count", 1);
redis.expire(featureKey, 86400); // 24h过期
// 记录最近点击时间
redis.hset(featureKey, "last_click_time", String.valueOf(event.timestamp));
break;
case "VIEW_DETAIL":
// 累计浏览次数(同一商品重复浏览也计数)
redis.hincrBy(featureKey, "detail_view_count:" + event.itemId, 1);
break;
case "ADD_CART":
// 标记加购意图
redis.sadd("intent:cart:" + event.userId, event.itemId);
break;
}
// 输出特征更新事件(供下游使用)
out.collect(new FeatureUpdate(
event.userId,
event.featureType,
event.itemId,
event.timestamp
));
}
}
五、性能指标与成果
5.1 核心业务指标
5.2 各层性能数据
| 阶段 | 候选集规模 | 延迟P99 | 技术方案 |
|---|---|---|---|
| 召回层 | 100万 → 5000 | 15ms | Faiss IVFFlat + 多路并行 |
| 粗排层 | 5000 → 500 | 20ms | 双塔模型 + 实时特征 |
| 精排层 | 500 → 50 | 30ms | DIN深度模型 |
| 重排层 | 50 → 10 | 5ms | DPP多样性 + 打散 |
| 端到端 | 100万 → 10 | ≤100ms | 全链路优化 |
💡 经验一:漏斗逐级降维,平衡效果与性能
推荐系统的每一级漏斗,都需要精心设计"保留多少候选"——保留太多会导致下游计算爆炸,保留太少会漏掉好内容。我们的经验是:召回层保留候选集千分之五(百万→五千),粗排保留百分之一(五千→五十),精排保留百分之十(五十→五),重排根据业务需求灵活调整。这个比例不是固定的,需要通过AB测试持续优化。
💡 经验二:实时特征是推荐效果的"秘密武器"
离线训练的模型,使用的特征往往是"T-1天"甚至"T-7天"的。而用户的兴趣变化非常快——早上看了科技视频,晚上可能就在看美食。实时特征工程让推荐系统具备了"感知当下"的能力。我们实测发现,加入实时点击特征后,点击率提升约15%,但同时也要防止"特征穿越"(使用了当前时刻之后的数据),这是一个需要严格测试验证的边界。
💡 经验三:向量索引的更新策略决定推荐的新鲜度
Faiss索引的全量重建代价很高(十亿级向量,重建一次需要数小时),但完全不做更新又会导致新内容永远得不到曝光。我们的实践是:白天增量更新(每小时追加新内容的向量),凌晨低峰期做全量重建。同时,使用"热度boost"机制——新内容的曝光和点击会快速累积,这些行为数据可以实时更新其向量表示,形成正反馈循环。