对应项目网站: https://github.com/xai-org/x-algorithm
项目概述
该项目是一个开源的 X(原 Twitter)“For You” 信息流推荐算法实现,展示了现代社交媒体平台如何利用深度学习技术为数亿用户提供个性化内容推荐。该项目结合了内网内容(你关注的账号)和网外内容(通过机器学习发现的相关内容),使用基于 Grok 的 Transformer 模型进行统一排序。
技术栈:
- 后端服务:Rust(高性能、内存安全)
- 机器学习:Python + JAX(Google 的高性能数值计算库)
- 模型架构:改编自 xAI 开源的 Grok-1 Transformer
系统架构设计
三大核心组件深度解析
1. Home Mixer(编排层)
Home Mixer 是整个推荐系统的”大脑”,负责编排和协调所有推荐流程。它采用高度模块化的候选管道(Candidate Pipeline)框架。
1.1 Query Hydrators(查询水化器)
作用:在推荐开始前,收集用户的完整上下文信息。
实现的水化器:
UserActionSeqQueryHydrator:
- 从 UAS(User Action Sequence)服务获取用户最近的互动历史
- 包括:点赞、回复、转发、点击等行为序列
- 最多获取 32 条历史记录
- 这些历史将作为 Transformer 的上下文输入
UserFeaturesQueryHydrator:
- 获取用户的社交关系(关注列表、粉丝列表)
- 获取用户偏好设置(静音关键词、屏蔽账号等)
- 从 Strato 分布式存储中读取
关键优化:这些水化器并行执行,减少延迟。
1.2 Candidate Sources(候选源)
PhoenixSource(网外内容):
pub struct PhoenixSource {
phoenix_retrieval_client: Arc<dyn PhoenixRetrievalClient>,
}- 调用 Phoenix 双塔检索模型
- 从全局语料库中发现相关内容
- 返回 Top-1000 候选
ThunderSource(网内内容):
pub struct ThunderSource {
thunder_client: Arc<ThunderClient>,
}- 查询用户关注账号的最新帖子
- 直接从内存读取,超低延迟
- 按帖子类型分类(原创、转发、视频)
并行策略:两个源同时查询,结果合并后进入下一阶段。
1.3 Candidate Hydrators(候选水化器)
逐步丰富候选内容的元数据:
| Hydrator | 功能 | 数据源 |
|---|---|---|
InNetworkCandidateHydrator | 标记候选是否来自关注账号 | 本地计算 |
CoreDataCandidateHydrator | 获取帖子核心数据(文本、媒体等) | TES (Tweet Entity Service) |
GizmoduckCandidateHydrator | 获取作者信息(用户名、认证状态) | Gizmoduck 用户服务 |
VideoDurationCandidateHydrator | 获取视频时长 | TES |
SubscriptionHydrator | 检查付费订阅内容访问权限 | TES |
关键优化:这些水化器也可以并行执行,但有依赖关系的顺序执行。
1.4 Filters(过滤器)
Pre-Scoring Filters(评分前过滤):
| 优先级 | Filter | 过滤原因 | 节省成本 |
|---|---|---|---|
| 1 | DropDuplicatesFilter | 去除重复帖子 ID | 减少后续处理 |
| 2 | CoreDataHydrationFilter | 去除水化失败的候选 | 避免无效数据 |
| 3 | AgeFilter | 去除超过 24 小时的旧帖 | 保证时效性 |
| 4 | SelfTweetFilter | 去除用户自己的帖子 | 避免自推荐 |
| 5 | RetweetDeduplicationFilter | 去重同一内容的多次转发 | 提高多样性 |
| 6 | IneligibleSubscriptionFilter | 去除用户无权访问的付费内容 | 避免付费墙 |
| 7 | PreviouslySeenPostsFilter | 去除用户已看过的帖子 | 减少重复 |
| 8 | PreviouslyServedPostsFilter | 去除本次会话已推荐的帖子 | 会话去重 |
| 9 | MutedKeywordFilter | 去除包含用户静音关键词的帖子 | 尊重用户偏好 |
| 10 | AuthorSocialgraphFilter | 去除被屏蔽/静音作者的帖子 | 社交图谱过滤 |
关键优化:过滤器按照从便宜到昂贵的顺序执行,早期过滤可以大幅减少后续计算量。
1.5 Scorers(评分器链)
顺序应用多个评分器,每个评分器添加新的分数字段:
// 1. Phoenix Scorer - ML 模型预测
PhoenixScorer → 预测 14+ 种互动概率
↓
// 2. Weighted Scorer - 加权组合
WeightedScorer → 计算 weighted_score
↓
// 3. Author Diversity Scorer - 多样性调整
AuthorDiversityScorer → 降低重复作者分数
↓
// 4. OON Scorer - 网外内容调整
OONScorer → 调整网外内容权重WeightedScorer 详细实现:
fn compute_weighted_score(candidate: &PostCandidate) -> f64 {
let s = &candidate.phoenix_scores;
// 正面信号(正权重)
let positive_score =
s.favorite_score * FAVORITE_WEIGHT + // 0.5
s.reply_score * REPLY_WEIGHT + // 1.0
s.retweet_score * RETWEET_WEIGHT + // 1.0
s.click_score * CLICK_WEIGHT + // 0.1
s.dwell_score * DWELL_WEIGHT + // 0.05
s.share_score * SHARE_WEIGHT + // 2.0
s.follow_author_score * FOLLOW_WEIGHT; // 5.0
// 负面信号(负权重)
let negative_score =
s.not_interested_score * NOT_INTERESTED_WEIGHT + // -10.0
s.block_author_score * BLOCK_WEIGHT + // -50.0
s.mute_author_score * MUTE_WEIGHT + // -30.0
s.report_score * REPORT_WEIGHT; // -100.0
positive_score + negative_score
}关键优化:
- 负面信号权重远大于正面信号,强力抑制用户可能讨厌的内容
- 深度互动(回复、分享、关注)权重高于浅度互动(点赞、点击)
1.6 Selector(选择器)
pub struct TopKScoreSelector;- 按
weighted_score降序排序 - 选择 Top-K(默认 K=50)
1.7 Post-Selection Processing(后选择处理)
VFCandidateHydrator:
- 调用 Visibility Filtering 服务
- 获取内容安全标签
Post-Selection Filters:
| Filter | 功能 |
|---|---|
VFFilter | 过滤被标记为垃圾/暴力/违规的内容 |
DedupConversationFilter | 去重同一对话的多个分支 |
为什么后置过滤?
- VF 服务调用成本高,只对 Top-K 候选调用
- 避免对所有候选进行昂贵的内容审核
2. Thunder(内存存储引擎)
Thunder 是专为推荐系统设计的高性能内存数据库,解决了传统数据库的延迟问题。
2.1 数据结构设计
pub struct PostStore {
// 完整帖子数据:post_id → LightPost
posts: Arc<DashMap<i64, LightPost>>,
// 按用户索引的原创帖子:user_id → [TinyPost]
original_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,
// 按用户索引的回复/转发:user_id → [TinyPost]
secondary_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,
// 按用户索引的视频帖子:user_id → [TinyPost]
video_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,
// 已删除帖子标记
deleted_posts: Arc<DashMap<i64, bool>>,
retention_seconds: u64,
request_timeout: Duration,
}TinyPost vs LightPost:
TinyPost:仅存储(post_id, created_at),极小内存占用LightPost:存储完整帖子元数据(文本、媒体、作者 ID 等)
设计优势:
- 用户索引只存储 TinyPost 引用,节省内存
- 实际数据在
posts表中,避免重复存储 - 使用
VecDeque实现高效的 FIFO 队列
2.2 实时数据摄取
// Kafka 消费者监听帖子事件
pub struct TweetEventsListener {
kafka_consumer: StreamConsumer,
post_store: Arc<PostStore>,
}
impl TweetEventsListener {
async fn consume_events(&self) {
loop {
match self.kafka_consumer.recv().await {
Ok(message) => {
let event = parse_tweet_event(message);
match event {
TweetEvent::Create(post) => {
self.post_store.add_post(post);
}
TweetEvent::Delete(post_id) => {
self.post_store.mark_as_deleted(post_id);
}
}
}
Err(e) => log::error!("Kafka error: {}", e),
}
}
}
}关键特性:
- 毫秒级延迟:从帖子发布到可被检索 < 100ms
- 自动分类:根据帖子类型自动放入不同索引
- 容错设计:Kafka 消费失败不影响查询服务
2.3 查询接口
impl PostStore {
/// 获取指定用户列表的最新帖子
pub fn get_posts_by_users(
&self,
user_ids: &[i64],
max_posts_per_user: usize,
) -> Vec<LightPost> {
let start = Instant::now();
let mut results = Vec::new();
for user_id in user_ids {
// 检查超时
if self.request_timeout > 0 && start.elapsed() > self.request_timeout {
break;
}
// 从原创帖子索引获取
if let Some(posts) = self.original_posts_by_user.get(user_id) {
for tiny_post in posts.iter().take(max_posts_per_user) {
if let Some(full_post) = self.posts.get(&tiny_post.post_id) {
results.push(full_post.clone());
}
}
}
}
results
}
}性能特征:
- P50 延迟:< 1ms
- P99 延迟:< 5ms
- 吞吐量:100K+ QPS(单机)
2.4 内存管理
自动清理机制:
async fn cleanup_old_posts(&self) {
let now = SystemTime::now();
let cutoff = now - Duration::from_secs(self.retention_seconds);
// 遍历所有帖子,删除过期数据
self.posts.retain(|_, post| {
post.created_at > cutoff
});
// 清理用户索引中的过期引用
for mut entry in self.original_posts_by_user.iter_mut() {
entry.retain(|tiny| tiny.created_at > cutoff);
}
}内存占用估算:
- 1 亿条帖子(24 小时)
- 每条 LightPost ≈ 200 bytes
- 每条 TinyPost ≈ 16 bytes
- 总内存 ≈ 20GB - 30GB(可单机部署)
3. Phoenix(机器学习核心)
Phoenix 是推荐系统的智能引擎,包含检索和排序两个模型。
3.1 检索模型 - 双塔架构
为什么需要双塔?
- 候选数量:数千万 - 数亿条帖子
- 延迟要求:< 50ms
- 传统 Transformer:O(N) 复杂度,无法实时计算
双塔解决方案:
离线阶段:
候选塔(Candidate Tower)对所有帖子进行编码
→ 生成帖子向量库 [N, D]
→ 存入向量数据库(如 FAISS、ScaNN)
在线阶段:
用户塔(User Tower)对当前用户编码
→ 生成用户向量 [1, D]
→ 与向量库进行 ANN 搜索
→ 返回 Top-1000 最相似帖子用户塔架构:
class UserTower(hk.Module):
def __call__(self, user_features, user_history):
# 1. 用户特征嵌入
user_emb = self.user_embedding(user_features) # [B, D]
# 2. 历史序列编码
history_embs = self.post_embedding(user_history) # [B, S, D]
# 3. Transformer 编码
transformer_out = self.transformer(
jnp.concatenate([user_emb, history_embs], axis=1)
) # [B, S+1, D]
# 4. 池化为单一向量
user_vector = transformer_out[:, 0, :] # 取 CLS token
# 5. L2 归一化
user_vector = user_vector / jnp.linalg.norm(user_vector, axis=-1, keepdims=True)
return user_vector # [B, D]候选塔架构:
class CandidateTower(hk.Module):
def __call__(self, post_features):
# 1. 帖子内容嵌入
post_emb = self.post_embedding(post_features) # [N, D]
# 2. 作者信息嵌入
author_emb = self.author_embedding(post_features.author_id) # [N, D]
# 3. 组合嵌入
combined = post_emb + author_emb
# 4. MLP 投影
candidate_vector = self.mlp(combined) # [N, D]
# 5. L2 归一化
candidate_vector = candidate_vector / jnp.linalg.norm(
candidate_vector, axis=-1, keepdims=True
)
return candidate_vector # [N, D]相似度计算:
# 用户向量:[B, D]
# 候选向量:[N, D]
scores = jnp.dot(user_vector, candidate_vectors.T) # [B, N]
# Top-K 检索
top_k_indices = jnp.argsort(scores, axis=-1)[:, -1000:] # [B, 1000]训练目标:
- 正样本:用户实际互动过的帖子
- 负样本:随机采样 + 批内负采样
- 损失函数:Softmax Cross-Entropy
3.2 排序模型 - Transformer with Candidate Isolation
模型输入:
class RecsysBatch:
# 用户特征
user_hashes: [B, num_user_hashes]
# 历史序列(最多 32 条)
history_post_hashes: [B, S, num_item_hashes]
history_author_hashes: [B, S, num_author_hashes]
history_actions: [B, S] # 用户在历史帖子上的行为
history_product_surface: [B, S] # 产品界面(Timeline/Search/Notification)
# 候选序列(8-16 条)
candidate_post_hashes: [B, C, num_item_hashes]
candidate_author_hashes: [B, C, num_author_hashes]
candidate_product_surface: [B, C]哈希嵌入查找:
def hash_embedding_lookup(hashes, embedding_table, num_hashes):
"""
hashes: [B, num_hashes] 例如 [[123, 456], [789, 012]]
embedding_table: [vocab_size, D]
"""
# 查找每个哈希值的嵌入
embeddings = embedding_table[hashes] # [B, num_hashes, D]
# 拼接多个哈希嵌入
concatenated = embeddings.reshape(B, num_hashes * D)
# 投影回原始维度
combined = jnp.dot(concatenated, projection_matrix) # [B, D]
return combined为什么使用多哈希?
- 减少冲突:单哈希函数容易碰撞,多哈希降低冲突概率
- 增强表达:不同哈希函数捕获不同特征
- 处理长尾:罕见词汇也能得到合理表示
Transformer 前向传播:
def forward(batch, embeddings):
# 1. 嵌入层
user_emb = block_user_reduce(batch.user_hashes, embeddings.user_embeddings)
history_embs = block_history_reduce(
batch.history_post_hashes,
embeddings.history_post_embeddings,
embeddings.history_author_embeddings,
batch.history_actions,
batch.history_product_surface,
)
candidate_embs = block_candidate_reduce(
batch.candidate_post_hashes,
embeddings.candidate_post_embeddings,
embeddings.candidate_author_embeddings,
batch.candidate_product_surface,
)
# 2. 拼接输入序列
# [User | History_1 ... History_S | Candidate_1 ... Candidate_C]
input_seq = jnp.concatenate([user_emb, history_embs, candidate_embs], axis=1)
# 3. 创建候选隔离注意力掩码
attn_mask = make_recsys_attn_mask(
seq_len=1 + S + C,
candidate_start_offset=1 + S,
)
# 4. Transformer 编码
transformer_out = transformer(input_seq, attn_mask)
# 5. 提取候选输出
candidate_outputs = transformer_out[:, 1+S:, :] # [B, C, D]
# 6. 多任务预测头
logits = prediction_head(candidate_outputs) # [B, C, num_actions]
return logits输出格式:
logits: [B, C, num_actions]
# B = batch_size
# C = num_candidates
# num_actions = 14(favorite, reply, repost, ...)
# 转换为概率
probs = jax.nn.sigmoid(logits) # [B, C, 14]核心技术亮点深度解析
🌟 亮点 1:候选隔离注意力机制(Candidate Isolation)
问题背景
在传统 Transformer 推荐模型中,所有候选内容在同一批次中进行处理:
输入序列:[User, History_1, ..., History_32, Cand_1, Cand_2, ..., Cand_8]
↑ ↑
| |
这些候选可以互相关注(Full Attention)问题:
- 批次依赖:Cand_1 的分数会受到 Cand_2, Cand_3 等的影响
- 不一致性:同一条帖子在不同批次中可能得到不同分数
- 无法缓存:由于批次依赖,候选分数无法预计算缓存
Phoenix 的解决方案
def make_recsys_attn_mask(seq_len, candidate_start_offset):
"""创建推荐系统专用的注意力掩码"""
# 1. 从因果掩码开始(下三角矩阵)
causal_mask = jnp.tril(jnp.ones((1, 1, seq_len, seq_len)))
# 2. 将候选区域的非对角线元素置 0(禁止候选间互相关注)
attn_mask = causal_mask.at[:, :, candidate_start_offset:,
candidate_start_offset:].set(0)
# 3. 恢复候选的自注意力(对角线元素)
candidate_indices = jnp.arange(candidate_start_offset, seq_len)
attn_mask = attn_mask.at[:, :, candidate_indices, candidate_indices].set(1)
return attn_mask注意力掩码可视化:
Keys (what we attend TO)
──────────────────────────────────▶
│ U │ History │ Candidates │
┌───┼───┼───────────┼────────────────┤
Q │ U │ ✓ │ ✓ ✓ ✓ ✓ │ ✗ ✗ ✗ ✗ ✗ ✗ │
u │ H │ ✓ │ ✓ ✓ ✓ ✓ │ ✗ ✗ ✗ ✗ ✗ ✗ │
e │ i │ ✓ │ ✓ ✓ ✓ ✓ │ ✗ ✗ ✗ ✗ ✗ ✗ │
r │ s │ ✓ │ ✓ ✓ ✓ ✓ │ ✗ ✗ ✗ ✗ ✗ ✗ │
i ├───┼───┼───────────┼────────────────┤
e │ C │ ✓ │ ✓ ✓ ✓ ✓ │ ✓ ✗ ✗ ✗ ✗ ✗ │ ← Cand_1 只能看自己
s │ a │ ✓ │ ✓ ✓ ✓ ✓ │ ✗ ✓ ✗ ✗ ✗ ✗ │ ← Cand_2 只能看自己
│ n │ ✓ │ ✓ ✓ ✓ ✓ │ ✗ ✗ ✓ ✗ ✗ ✗ │ ← Cand_3 只能看自己
│ d │ ✓ │ ✓ ✓ ✓ ✓ │ ✗ ✗ ✗ ✓ ✗ ✗ │
└───┴───┴───────────┴────────────────┘带来的优势
评分一致性:
# 同一条帖子在不同批次中得到相同分数 batch_1 = [user, history, post_A, post_B, post_C] batch_2 = [user, history, post_A, post_X, post_Y] # post_A 在两个批次中的分数完全一致 score(post_A | batch_1) == score(post_A | batch_2)分数缓存:
# 可以预先计算热门帖子的分数 cache_key = hash(user_id, user_history, post_id) if cache_key in cache: return cache[cache_key] else: score = model.predict(user, history, post) cache[cache_key] = score return score并行推理:
# 可以将候选分片,并行推理 candidates = [post_1, post_2, ..., post_1000] # 分成 10 批,每批 100 个候选 batches = chunk(candidates, 100) # 并行计算(每批独立) scores = parallel_map(lambda batch: model.predict(user, history, batch), batches)A/B 测试友好:
- 候选分数独立,方便进行对照实验
- 可以精确测量单个候选的效果
与传统方法对比
| 维度 | 传统 Full Attention | Candidate Isolation |
|---|---|---|
| 评分一致性 | ❌ 批次相关 | ✅ 批次无关 |
| 缓存能力 | ❌ 无法缓存 | ✅ 可缓存 |
| 并行推理 | ❌ 必须整批 | ✅ 可分片 |
| 模型表达力 | 高(候选间交互) | 中(无候选间交互) |
| 训练复杂度 | 高 | 低 |
权衡:虽然候选隔离降低了模型的表达能力(候选无法互相参考),但在生产环境中,一致性和可缓存性 远比微小的模型性能提升更重要。
🌟 亮点 2:零手工特征工程
传统推荐系统的特征工程噩梦
传统推荐系统需要设计数百个手工特征:
# 传统特征示例
features = {
# 内容特征
'post_age_hours': (now - post.created_at).hours,
'post_length': len(post.text),
'has_image': int(post.has_image),
'has_video': int(post.has_video),
'num_hashtags': len(post.hashtags),
'num_mentions': len(post.mentions),
# 作者特征
'author_follower_count': author.follower_count,
'author_verified': int(author.verified),
'author_creation_date': (now - author.created_at).days,
'author_post_frequency': author.posts_last_30d / 30,
# 互动特征
'post_like_count': post.like_count,
'post_retweet_count': post.retweet_count,
'post_reply_count': post.reply_count,
'engagement_rate': post.engagements / post.impressions,
# 用户-内容匹配特征
'user_author_similarity': cosine_similarity(user.interests, author.topics),
'topic_match_score': compute_topic_match(user.history, post.topics),
'language_match': int(user.language == post.language),
# ... 100+ 更多特征
}问题:
- 🔥 维护成本高:每个特征需要专门的数据管道
- 🔥 领域知识依赖:需要深入理解业务才能设计好特征
- 🔥 扩展困难:新增特征需要改造整个系统
- 🔥 时效性差:特征计算可能有延迟(如 follower_count)
Phoenix 的端到端学习
# Phoenix 只需要原始 ID 和行为序列
inputs = {
'user_id': user_id,
'history_post_ids': [post_1, post_2, ..., post_32],
'history_actions': [like, retweet, reply, ...],
'candidate_post_ids': [cand_1, cand_2, ..., cand_8],
}
# 模型自动学习特征表示
predictions = model.predict(inputs)模型如何学习隐式特征?
嵌入层捕获实体特征:
# 帖子嵌入自动编码了:话题、风格、质量等 post_embedding = embedding_table[post_id] # 学习到的 128 维向量 # 示例:相似话题的帖子嵌入接近 sports_post_1: [0.8, 0.1, -0.3, ...] sports_post_2: [0.7, 0.2, -0.2, ...] politics_post: [-0.5, 0.9, 0.4, ...]Transformer 捕获序列模式:
# 自动学习用户的兴趣演变 history = [post_1, post_2, post_3, ...] # Transformer 发现模式: # - 用户最近对科技话题感兴趣 # - 用户倾向于在早晨阅读新闻 # - 用户喜欢有图片的长文多任务学习捕获行为偏好:
# 同时预测多个行为,模型学到不同行为的特征 outputs = { 'P(like)': 0.8, # 用户可能点赞(表示浅层兴趣) 'P(reply)': 0.2, # 用户不太会回复(内容不够引人讨论) 'P(share)': 0.1, # 用户不太会分享(内容不够优质) } # 模型自动区分了"容易点赞"和"值得分享"的内容
对比实验结果
根据 README 中的描述:
“We have eliminated every single hand-engineered feature and most heuristics from the system.”
带来的好处:
- ✅ 数据管道简化:只需 ID 流和行为日志
- ✅ 延迟降低:无需等待特征计算
- ✅ 更强泛化:模型自动发现特征,不受人类认知偏见限制
- ✅ 快速迭代:新业务场景无需重新设计特征
🌟 亮点 3:多动作预测(Multi-Task Learning)
为什么不能只预测”相关性”?
单一相关性分数的问题:
# 传统单目标模型
relevance_score = model.predict(user, post) # 只有一个数字
# 问题:无法区分不同类型的相关性
post_A: relevance = 0.8 # 用户会看,但可能不喜欢
post_B: relevance = 0.7 # 用户不仅会看,还会分享Phoenix 的多任务学习架构
class PhoenixModel:
def __call__(self, user, history, candidates):
# 共享 Transformer 编码器
embeddings = self.transformer(user, history, candidates)
# 14 个独立的预测头
predictions = {
# 正面信号
'P(favorite)': sigmoid(self.favorite_head(embeddings)),
'P(reply)': sigmoid(self.reply_head(embeddings)),
'P(repost)': sigmoid(self.repost_head(embeddings)),
'P(quote)': sigmoid(self.quote_head(embeddings)),
'P(click)': sigmoid(self.click_head(embeddings)),
'P(profile_click)': sigmoid(self.profile_click_head(embeddings)),
'P(video_view)': sigmoid(self.video_view_head(embeddings)),
'P(photo_expand)': sigmoid(self.photo_expand_head(embeddings)),
'P(share)': sigmoid(self.share_head(embeddings)),
'P(dwell_2s+)': sigmoid(self.dwell_head(embeddings)),
'P(follow_author)': sigmoid(self.follow_head(embeddings)),
# 负面信号
'P(not_interested)': sigmoid(self.not_interested_head(embeddings)),
'P(block_author)': sigmoid(self.block_head(embeddings)),
'P(mute_author)': sigmoid(self.mute_head(embeddings)),
'P(report)': sigmoid(self.report_head(embeddings)),
}
return predictions加权组合策略
fn compute_weighted_score(phoenix_scores: &PhoenixScores) -> f64 {
// 正面信号(正权重)
let positive =
phoenix_scores.favorite_score * 0.5 +
phoenix_scores.reply_score * 1.0 + // 深度互动权重更高
phoenix_scores.repost_score * 1.0 +
phoenix_scores.quote_score * 0.8 +
phoenix_scores.click_score * 0.1 + // 浅层互动权重较低
phoenix_scores.share_score * 2.0 + // 分享是强信号
phoenix_scores.follow_author_score * 5.0; // 关注作者是最强信号
// 负面信号(负权重,绝对值很大)
let negative =
phoenix_scores.not_interested_score * -10.0 +
phoenix_scores.block_author_score * -50.0 +
phoenix_scores.mute_author_score * -30.0 +
phoenix_scores.report_score * -100.0; // 举报是最强负信号
positive + negative
}权重设计哲学:
- 深度互动(reply, share, follow)> 浅层互动(like, click)
- 负面信号权重远大于正面信号(宁可少推荐,不要推荐错)
- 权重可以通过线上 A/B 测试动态调整
多任务学习的优势
更丰富的用户理解:
# 示例:两个相似的帖子 post_A: P(like) = 0.9 # 用户很可能点赞 P(reply) = 0.1 # 但不太会回复 → 浅层兴趣,娱乐内容 post_B: P(like) = 0.6 # 用户可能点赞 P(reply) = 0.7 # 很可能回复 P(share) = 0.5 # 可能分享 → 深度兴趣,高质量内容 # 最终分数:post_B > post_A负面信号过滤:
post_C: P(like) = 0.8 P(block) = 0.3 # 有 30% 概率被屏蔽 → 最终分数 = 0.8 * 0.5 + 0.3 * (-50) = -14.6(不推荐)训练信号丰富:
# 传统单任务:只有点击/未点击两种标签 labels = [1, 0, 1, 0, ...] # 信号稀疏 # 多任务:14 种行为都是训练信号 labels = { 'like': [1, 0, 1, 0, ...], 'reply': [0, 0, 1, 0, ...], 'share': [0, 0, 0, 0, ...], ... } # 即使用户没有点赞,"reply" 和 "share" 也提供了训练信号
🌟 亮点 4:可组合的管道架构
设计模式:责任链 + 依赖注入
pub trait CandidatePipeline<Q, C> {
// 定义管道的各个阶段
fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<Q>>];
fn sources(&self) -> &[Box<dyn Source<Q, C>>];
fn hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
fn filters(&self) -> &[Box<dyn Filter<Q, C>>];
fn scorers(&self) -> &[Box<dyn Scorer<Q, C>>];
fn selector(&self) -> &dyn Selector<Q, C>;
// 执行管道的模板方法
async fn execute(&self, query: Q) -> PipelineResult<Q, C> {
let query = self.hydrate_query(query).await;
let candidates = self.fetch_candidates(&query).await;
let candidates = self.hydrate(&query, candidates).await;
let candidates = self.filter(&query, candidates).await;
let candidates = self.score(&query, candidates).await;
let candidates = self.select(&query, candidates);
PipelineResult { candidates, query }
}
}并行执行优化
async fn hydrate_query(&self, mut query: Q) -> Q {
// 所有 query hydrators 并行执行
let hydrators = self.query_hydrators();
let futures: Vec<_> = hydrators
.iter()
.map(|h| h.hydrate(&query))
.collect();
let results = join_all(futures).await;
// 合并结果到 query
for result in results {
query.merge(result);
}
query
}
async fn fetch_candidates(&self, query: &Q) -> Vec<C> {
// 所有 sources 并行查询
let sources = self.sources();
let futures: Vec<_> = sources
.iter()
.map(|s| s.fetch(query))
.collect();
let results = join_all(futures).await;
// 合并所有候选
results.into_iter().flatten().collect()
}性能提升:
- 串行执行:100ms (query hydration) + 50ms (sources) = 150ms
- 并行执行:max(100ms, 50ms) = 100ms
- 延迟降低 33%
易扩展性示例
添加新数据源:
pub struct TrendingSource {
trending_service: Arc<TrendingService>,
}
#[async_trait]
impl Source<ScoredPostsQuery, PostCandidate> for TrendingSource {
async fn fetch(&self, query: &ScoredPostsQuery) -> Vec<PostCandidate> {
let trending_posts = self.trending_service
.get_trending_posts(query.user_location)
.await?;
trending_posts.into_iter()
.map(|p| PostCandidate::from_trending(p))
.collect()
}
}
// 在 pipeline 中注册(无需修改框架代码)
let sources = vec![
Box::new(ThunderSource { ... }),
Box::new(PhoenixSource { ... }),
Box::new(TrendingSource { ... }), // 新增数据源
];添加新过滤器:
pub struct LanguageFilter {
supported_languages: HashSet<String>,
}
#[async_trait]
impl Filter<ScoredPostsQuery, PostCandidate> for LanguageFilter {
async fn filter(
&self,
query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Result<Vec<bool>, String> {
let keep_flags = candidates
.iter()
.map(|c| {
c.language
.as_ref()
.map(|lang| self.supported_languages.contains(lang))
.unwrap_or(false)
})
.collect();
Ok(keep_flags)
}
}
// 在 pipeline 中注册
filters.push(Box::new(LanguageFilter { ... }));错误处理策略
async fn fetch_candidates(&self, query: &Q) -> Vec<C> {
let sources = self.sources();
let futures: Vec<_> = sources.iter().map(|s| s.fetch(query)).collect();
let results = join_all(futures).await;
let mut all_candidates = Vec::new();
for (idx, result) in results.into_iter().enumerate() {
match result {
Ok(candidates) => {
all_candidates.extend(candidates);
}
Err(e) => {
// 单个源失败不影响整体
log::warn!("Source {} failed: {}", idx, e);
emit_metric("source_failure", &[("source_id", idx.to_string())]);
}
}
}
all_candidates
}优雅降级:即使某个数据源失败,系统仍能返回其他源的结果。
🌟 亮点 5:基于哈希的嵌入查找(Hash Embeddings)
传统嵌入表的问题
# 传统嵌入表
vocab_size = 100_000_000 # 1 亿个帖子
embedding_dim = 128
embedding_table = np.zeros((vocab_size, embedding_dim))
# 内存占用:100M * 128 * 4 bytes = 51.2 GB(单个表)问题:
- 🔥 内存爆炸:ID 空间巨大(数亿帖子 × 数亿用户)
- 🔥 长尾稀疏:大部分 ID 很少出现,嵌入得不到充分训练
- 🔥 无法泛化:新 ID 没有嵌入向量(冷启动)
Hash Embedding 解决方案
class HashEmbedding(hk.Module):
def __init__(self, num_hashes=4, hash_vocab_size=1_000_000, emb_dim=128):
self.num_hashes = num_hashes
self.hash_vocab_size = hash_vocab_size
self.emb_dim = emb_dim
# 创建多个小的嵌入表
self.embedding_tables = [
hk.get_parameter(
f"hash_embedding_{i}",
shape=[hash_vocab_size, emb_dim],
init=hk.initializers.TruncatedNormal(stddev=0.01),
)
for i in range(num_hashes)
]
def __call__(self, ids):
"""
ids: [B, S] 原始 ID(可以非常大)
returns: [B, S, emb_dim]
"""
embeddings = []
for i, embedding_table in enumerate(self.embedding_tables):
# 使用不同的哈希函数
hashed_ids = hash_function(ids, seed=i) % self.hash_vocab_size
emb = embedding_table[hashed_ids] # [B, S, emb_dim]
embeddings.append(emb)
# 组合多个哈希嵌入
combined = sum(embeddings) / self.num_hashes
return combined内存对比:
# 传统方法
memory_traditional = 100_000_000 * 128 * 4 = 51.2 GB
# Hash Embedding(4 个哈希表)
memory_hash = 4 * 1_000_000 * 128 * 4 = 2.05 GB
# 节省 96% 内存!多哈希函数的优势
1. 降低冲突概率:
# 单哈希:两个不同 ID 可能冲突
hash(post_123, seed=0) % 1M = 456789
hash(post_456, seed=0) % 1M = 456789 # 冲突!
# 多哈希:冲突概率指数下降
P(collision with 1 hash) = 1 / 1M
P(collision with 4 hashes) = (1 / 1M) ^ 4 ≈ 02. 更丰富的表示:
# 每个哈希函数捕获不同方面
hash_0(post_id) → 捕获话题特征
hash_1(post_id) → 捕获作者特征
hash_2(post_id) → 捕获时间特征
hash_3(post_id) → 捕获格式特征
# 组合后得到全面的表示
final_embedding = (emb_0 + emb_1 + emb_2 + emb_3) / 43. 天然处理冷启动:
# 新帖子(从未见过的 ID)
new_post_id = 999_999_999
# 仍然可以获得合理的嵌入(基于哈希值)
embedding = hash_embedding(new_post_id)
# 即使是全新的 ID,由于哈希函数的均匀性,
# 它会映射到已训练的嵌入空间中的某个位置Phoenix 中的实际应用
class RecsysModel:
def __init__(self, config):
# 用户 Hash Embedding(2 个哈希)
self.user_embedding = HashEmbedding(
num_hashes=config.num_user_hashes, # 2
hash_vocab_size=10_000_000,
emb_dim=config.emb_size,
)
# 帖子 Hash Embedding(2 个哈希)
self.post_embedding = HashEmbedding(
num_hashes=config.num_item_hashes, # 2
hash_vocab_size=50_000_000,
emb_dim=config.emb_size,
)
# 作者 Hash Embedding(2 个哈希)
self.author_embedding = HashEmbedding(
num_hashes=config.num_author_hashes, # 2
hash_vocab_size=10_000_000,
emb_dim=config.emb_size,
)总内存占用:
用户表:2 * 10M * 128 * 4 bytes = 10 GB
帖子表:2 * 50M * 128 * 4 bytes = 50 GB
作者表:2 * 10M * 128 * 4 bytes = 10 GB
──────────────────────────────────────
总计:70 GB(可在单 GPU 上训练)相比传统方法(数百 GB),节省了 70-80% 内存。
项目的核心优化点
1. 延迟优化
1.1 并行执行
// 所有独立阶段并行执行
async fn execute_pipeline(&self, query: Q) {
// 并行查询水化
let hydrators = self.query_hydrators();
let hydration_futures = hydrators.iter().map(|h| h.hydrate(&query));
let hydrated_query = join_all(hydration_futures).await;
// 并行候选源查询
let sources = self.sources();
let source_futures = sources.iter().map(|s| s.fetch(&hydrated_query));
let candidates = join_all(source_futures).await;
}收益:
- 串行:150ms → 并行:60ms
- P99 延迟降低 60%
1.2 早期过滤
// 过滤器按成本从低到高排序
let filters = vec![
DropDuplicatesFilter, // 成本:O(N),内存操作
AgeFilter, // 成本:O(N),简单比较
SelfTweetFilter, // 成本:O(N),ID比较
AuthorSocialgraphFilter, // 成本:O(N*M),社交图谱查询
MutedKeywordFilter, // 成本:O(N*K),文本匹配
VFFilter, // 成本:O(N),RPC调用(最贵)
];
// 早期过滤减少后续处理量
candidates: 1000 → 800 (DropDuplicates)
→ 600 (AgeFilter)
→ 550 (SelfTweetFilter)
→ 500 (AuthorSocialgraph)
→ 450 (MutedKeyword)
→ 400 (VFFilter,只处理 400 个,而不是 1000 个)收益:减少 60% 的昂贵 RPC 调用
1.3 Thunder 内存存储
// 传统方法:查询数据库
let posts = database.query("SELECT * FROM posts WHERE author_id IN (?)", following_ids).await;
// 延迟:20-50ms(P99)
// Thunder:内存查询
let posts = thunder_store.get_posts_by_users(&following_ids);
// 延迟:< 1ms(P99)
// 延迟降低 95%+2. 吞吐量优化
2.1 批处理推理
# 单条推理:低效
for post in candidates:
score = model.predict(user, history, post)
# 批量推理:高效(GPU 利用率更高)
batch_size = 32
for i in range(0, len(candidates), batch_size):
batch = candidates[i:i+batch_size]
scores = model.predict_batch(user, history, batch)
# 吞吐量提升:32x2.2 模型量化
# FP32 模型:4 bytes per parameter
model_fp32_size = 1B params * 4 bytes = 4 GB
inference_speed = 100 QPS
# INT8 量化:1 byte per parameter
model_int8_size = 1B params * 1 byte = 1 GB
inference_speed = 300 QPS
# 速度提升 3x,内存减少 75%3. 成本优化
3.1 分数缓存
# 缓存热门帖子的分数
cache = LRUCache(size=1_000_000)
def get_score(user_id, user_history_hash, post_id):
cache_key = (user_id, user_history_hash, post_id)
if cache_key in cache:
return cache[cache_key] # 缓存命中
score = model.predict(user, history, post)
cache[cache_key] = score
return score
# 缓存命中率:30-40%
# 计算成本降低:30-40%3.2 模型压缩
# Hash Embedding 压缩
traditional_embedding_size = 100M * 128 * 4 = 50 GB
hash_embedding_size = 4 * 10M * 128 * 4 = 20 GB
# 存储成本降低 60%
# 训练成本降低 50%(更少的参数更新)4. 可靠性优化
4.1 优雅降级
async fn fetch_candidates(&self, query: &Q) -> Vec<C> {
let mut candidates = Vec::new();
// Thunder 源失败 → 只用 Phoenix 源
match thunder_source.fetch(query).await {
Ok(thunder_candidates) => candidates.extend(thunder_candidates),
Err(e) => {
log::warn!("Thunder source failed, falling back to Phoenix only");
emit_metric("thunder_failure");
}
}
// Phoenix 源失败 → 只用 Thunder 源
match phoenix_source.fetch(query).await {
Ok(phoenix_candidates) => candidates.extend(phoenix_candidates),
Err(e) => {
log::warn!("Phoenix source failed, using Thunder only");
emit_metric("phoenix_failure");
}
}
candidates
}
// 单个源失败不影响整体服务
// 可用性从 99% → 99.99%4.2 超时保护
// Thunder 查询带超时
pub fn get_posts_by_users(
&self,
user_ids: &[i64],
timeout: Duration,
) -> Vec<LightPost> {
let start = Instant::now();
for user_id in user_ids {
if start.elapsed() > timeout {
log::warn!("Query timeout, returning partial results");
break; // 返回部分结果,而不是失败
}
// ... 查询逻辑
}
}
// 避免慢查询拖垮整个系统5. 开发效率优化
5.1 模块化设计
// 添加新功能无需修改核心代码
impl NewFeatureFilter { ... }
filters.push(Box::new(NewFeatureFilter));
// 开发周期:数周 → 数天5.2 可观测性
#[xai_stats_macro::receive_stats]
async fn score(&self, query: &Q, candidates: &[C]) {
// 自动记录:
// - 延迟(P50, P99)
// - 成功率
// - 候选数量分布
}
// 问题排查时间:数小时 → 数分钟完整的推荐流程
用户请求
↓
[1. 查询水化] 获取用户互动历史、关注列表等
↓
[2. 候选检索]
├─ Thunder: 关注账号的最新帖子(网内内容)
└─ Phoenix Retrieval: ML发现的相关帖子(网外内容)
↓
[3. 内容水化] 补充帖子元数据、作者信息、媒体信息
↓
[4. 预评分过滤] 移除:重复、过旧、自己的、已屏蔽、已静音等
↓
[5. ML 评分]
├─ Phoenix Scorer: Transformer 预测互动概率
├─ Weighted Scorer: 组合多个预测
├─ Author Diversity: 降低同一作者重复出现
└─ OON Scorer: 调整网外内容分数
↓
[6. 选择] 按分数排序,选择 Top-K
↓
[7. 后选择过滤] 内容审核(垃圾、暴力、违规等)
↓
个性化推荐流实现细节
过滤器系统
项目实现了 12+ 种过滤器,确保内容质量:
| 过滤器 | 作用 |
|---|---|
DropDuplicatesFilter | 去重 |
AgeFilter | 过滤过旧内容 |
SelfTweetFilter | 移除用户自己的帖子 |
AuthorSocialgraphFilter | 移除已屏蔽/静音作者 |
MutedKeywordFilter | 移除包含静音关键词的内容 |
PreviouslySeenPostsFilter | 移除已看过的内容 |
VFFilter | 内容审核(垃圾、暴力等) |
评分器链
顺序应用多个评分器:
- Phoenix Scorer:获取 ML 模型预测
- Weighted Scorer:组合成最终相关性分数
- Author Diversity Scorer:确保信息流多样性
- OON Scorer:调整网外内容权重
技术选型的智慧
- Rust for 服务层:内存安全、高性能、并发友好
- JAX for ML:JIT 编译、自动微分、GPU/TPU 加速
- Grok-1 架构:经过验证的大规模 Transformer 架构
- 内存存储(Thunder):亚毫秒级响应,无需数据库查询
- 异步管道:并行执行独立阶段,最大化吞吐量
业务场景与应用
典型业务场景
1. 社交媒体信息流推荐
场景描述:为 Twitter/X 这样的社交媒体平台提供个性化 “For You” 信息流。
业务需求:
- 每天为数亿用户生成个性化推荐
- 平衡关注账号(In-Network)和新发现内容(Out-of-Network)
- 实时响应(P99 延迟 < 200ms)
- 高度个性化(基于每个用户的独特互动历史)
系统解决方案:
用户刷新信息流
→ Home Mixer 接收请求
→ Thunder 提供关注账号的最新帖子(网内)
→ Phoenix Retrieval 发现相关帖子(网外)
→ Phoenix Ranker 统一排序所有候选
→ 返回个性化排序结果2. 内容冷启动问题
场景描述:新用户或新发布的帖子如何获得曝光?
解决方案:
- 新用户冷启动:通过双塔检索模型,即使用户历史很少,也能基于基本特征(如关注列表)找到相关内容
- 新帖子冷启动:Thunder 实时摄取新帖子,结合作者的历史表现和内容特征进行初始评分
3. 多样性与相关性平衡
场景描述:避免信息流被单一作者或话题主导。
解决方案:
- Author Diversity Scorer:降低同一作者重复出现的帖子分数
- OON Scorer:调整网外内容权重,确保探索与利用的平衡
- 多动作预测:不仅关注点赞,还考虑回复、分享等深度互动
4. 内容安全与合规
场景描述:过滤垃圾信息、暴力内容、违规内容。
解决方案:
- Pre-Scoring Filters:早期过滤明显违规内容,节省计算资源
- VF Filter (Visibility Filtering):后置过滤,使用专门的内容审核服务
- 负面信号预测:模型预测 P(block)、P(report) 等负面行为,主动降权
适用的其他业务场景
- 电商推荐:商品推荐、个性化首页
- 视频平台:短视频推荐、直播推荐
- 新闻聚合:个性化新闻推荐
- 音乐流媒体:歌曲、播放列表推荐
- 广告投放:精准广告匹配
如何使用
快速启动
环境准备
# 1. 安装 uv(现代 Python 包管理器)
pip install uv
# 或使用 curl 安装
curl -LsSf https://astral.sh/uv/install.sh | sh运行排序模型(Ranker)
cd phoenix
uv run run_ranker.py运行示例输出:
======================================================================
RECOMMENDATION SYSTEM DEMO
======================================================================
User has viewed 32 posts in their history
Ranking 8 candidate posts...
----------------------------------------------------------------------
RANKING RESULTS (ordered by predicted 'Favorite Score' probability)
----------------------------------------------------------------------
Rank 1:
Predicted engagement probabilities:
Favorite Score : ████████████████░░░░ 0.823
Reply Score : ██████░░░░░░░░░░░░░░ 0.312
Repost Score : ████████░░░░░░░░░░░░ 0.421
Quote Score : ███░░░░░░░░░░░░░░░░░ 0.156
Click Score : ██████████████░░░░░░ 0.712
Profile Click Score : ██░░░░░░░░░░░░░░░░░░ 0.089
Video View Score : ████████████░░░░░░░░ 0.634
Photo Expand Score : █████████░░░░░░░░░░░ 0.467
Share Score : ████░░░░░░░░░░░░░░░░ 0.201
Dwell Score : ███████████░░░░░░░░░ 0.578
Follow Author Score : █░░░░░░░░░░░░░░░░░░░ 0.034
Not Interested Score : ░░░░░░░░░░░░░░░░░░░░ 0.012
Block Author Score : ░░░░░░░░░░░░░░░░░░░░ 0.003
Mute Author Score : ░░░░░░░░░░░░░░░░░░░░ 0.005
Report Score : ░░░░░░░░░░░░░░░░░░░░ 0.001
Rank 2:
...
======================================================================
Demo complete!
======================================================================运行检索模型(Retrieval)
cd phoenix
uv run run_retrieval.py这将演示双塔模型如何从大量候选中检索出最相关的内容。
运行测试
cd phoenix
uv run pytest test_recsys_model.py test_recsys_retrieval_model.py -v集成到生产环境
1. 部署 Thunder 服务(内存存储)
# Thunder 需要连接到 Kafka 集群
# 配置 Kafka 连接
export KAFKA_BROKERS="kafka1:9092,kafka2:9092"
export KAFKA_TOPIC_POSTS="tweet_events"
# 启动 Thunder 服务
cargo run --release --bin thunderThunder 配置参数:
retention_seconds:帖子保留时长(默认 24 小时)request_timeout_ms:查询超时时间max_posts_per_author:每个作者最多缓存的帖子数
2. 部署 Phoenix 模型服务
# 加载模型权重
export MODEL_PATH="/path/to/phoenix_weights"
# 启动 Phoenix 排序服务
python -m phoenix.serve_ranker --port 8001
# 启动 Phoenix 检索服务
python -m phoenix.serve_retrieval --port 80023. 部署 Home Mixer 编排层
# 配置服务端点
export PHOENIX_RANKER_ENDPOINT="phoenix-ranker:8001"
export PHOENIX_RETRIEVAL_ENDPOINT="phoenix-retrieval:8002"
export THUNDER_ENDPOINT="thunder:9090"
# 启动 Home Mixer gRPC 服务
cargo run --release --bin home-mixer --port 90914. 客户端调用示例
// gRPC 客户端调用
let mut client = ScoredPostsServiceClient::connect("http://home-mixer:9091").await?;
let request = tonic::Request::new(ScoredPostsRequest {
user_id: 12345,
result_size: 50,
request_id: uuid::Uuid::new_v4().to_string(),
});
let response = client.get_scored_posts(request).await?;
let scored_posts = response.into_inner().posts;
// 渲染到用户界面
for post in scored_posts {
println!("Post ID: {}, Score: {}", post.post_id, post.score);
}自定义扩展
添加新的过滤器
use xai_candidate_pipeline::filter::Filter;
pub struct CustomFilter;
#[async_trait]
impl Filter<ScoredPostsQuery, PostCandidate> for CustomFilter {
async fn filter(
&self,
query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Result<Vec<bool>, String> {
// 自定义过滤逻辑
let keep_flags = candidates
.iter()
.map(|c| {
// 例如:只保留有图片的帖子
c.has_media.unwrap_or(false)
})
.collect();
Ok(keep_flags)
}
}
// 在 pipeline 中注册
filters.push(Box::new(CustomFilter));添加新的评分器
pub struct CustomScorer;
#[async_trait]
impl Scorer<ScoredPostsQuery, PostCandidate> for CustomScorer {
async fn score(
&self,
query: &ScoredPostsQuery,
candidates: &[PostCandidate],
) -> Result<Vec<PostCandidate>, String> {
let scored = candidates
.iter()
.map(|c| {
// 自定义评分逻辑
let custom_score = compute_custom_score(c);
PostCandidate {
custom_score: Some(custom_score),
..c.clone()
}
})
.collect();
Ok(scored)
}
}总结
这个开源项目展示了世界级推荐系统的设计精髓:
✅ 创新的机器学习架构:候选隔离注意力机制确保评分一致性 ✅ 工程卓越性:Rust + Python 混合架构发挥各自优势 ✅ 可扩展设计:模块化管道框架易于扩展和维护 ✅ 性能优化:内存存储、并行执行、评分缓存 ✅ 简化复杂度:消除手工特征工程,让模型自己学习
无论你是推荐系统工程师、机器学习研究员,还是对大规模系统感兴趣的开发者,这个项目都提供了宝贵的学习资源和最佳实践参考。
许可证:Apache License 2.0 项目地址:x-algorithm-main
希望这篇博客能帮你深入理解这个推荐算法项目!