1154 字
6 分钟
单机 Rust 架构处理 10 万+ 在线用户:深度分析与优化建议
本文基于 V2EX 帖子 《独立开发:单机 Rust 如何扛住 10 万+在线的行情推送?告别 Redis/Kafka 的”极简”架构》 进行深度分析和扩展。
原文核心策略回顾
作者分享了一个极简但高效的实时推送架构,主要策略包括:
| 策略 | 实现方式 | 效果 |
|---|---|---|
| 按需订阅 | 只推送用户正在查看的币对 | 带宽降低 95%(2GB/s → 100MB/s) |
| 列表数据优化 | Cloudflare 边缘缓存 + 1s TTL | 99% 流量卸载 |
| 进程内通信 | DashMap + tokio broadcast/mpsc | 纳秒级延迟 |
| 背压处理 | 128 容量 channel + try_send | 自动丢弃旧数据 |
| 零拷贝广播 | Arc | 10 万用户共享一份内存 |
优化建议与深度思考
1. 数据分层与压缩优化
现有问题:原文提到使用 JSON 格式推送,虽然简单但存在冗余。
优化方案:
// 方案一:增量更新(Delta Compression)
struct MarketDelta {
symbol: u16, // 用 ID 替代字符串
price_delta: i32, // 相对于基准价的差值
volume_delta: i32,
timestamp_offset: u16, // 相对于上一帧的时间偏移
}
// 方案二:二进制协议
// 使用 bincode/rkyv 替代 JSON,体积可减少 50-70%
let payload = rkyv::to_bytes::<_, 256>(&market_data)?;预期收益:
- 带宽再降 50-70%
- 解析 CPU 开销降低 80%
2. 智能背压策略升级
现有问题:简单的 try_send 丢弃策略可能导致用户体验跳跃。
优化方案:
// 分级背压策略
enum BackpressureLevel {
Normal, // 全量推送
Degraded, // 降低频率到 500ms
Critical, // 只推送价格变化 > 0.1% 的数据
Emergency, // 只推送用户持仓相关币对
}
async fn adaptive_send(
tx: &Sender<MarketData>,
data: MarketData,
level: &AtomicU8,
) {
match BackpressureLevel::from(level.load(Ordering::Relaxed)) {
Normal => { tx.try_send(data).ok(); }
Degraded => {
if data.timestamp % 500 == 0 {
tx.try_send(data).ok();
}
}
Critical => {
if (data.price_change_pct).abs() > 0.001 {
tx.try_send(data).ok();
}
}
Emergency => {
if user_holdings.contains(&data.symbol) {
tx.send(data).await.ok(); // 持仓数据保证送达
}
}
}
}3. 连接管理优化
现有问题:每个连接一个独立 task,10 万连接 = 10 万 task。
优化方案:
// 分组广播:将相同订阅的用户分组
struct SubscriptionGroup {
symbol: String,
connections: Vec<Arc<WebSocketSender>>,
last_data: Option<Arc<Bytes>>,
}
// 使用 io_uring 批量发送(Linux 5.1+)
async fn batch_broadcast(group: &SubscriptionGroup, data: Arc<Bytes>) {
// 单次系统调用发送给所有连接
let ring = IoUring::new(256)?;
for conn in &group.connections {
ring.push(conn.write_sqe(&data));
}
ring.submit_and_wait(group.connections.len())?;
}预期收益:
- Task 数量从 10 万降到几百(按订阅分组)
- 系统调用减少 99%
4. 内存池化
现有问题:频繁的 Arc<Bytes> 分配可能导致内存碎片。
优化方案:
use crossbeam::queue::ArrayQueue;
struct BytesPool {
pool: ArrayQueue<BytesMut>,
capacity: usize,
}
impl BytesPool {
fn acquire(&self) -> BytesMut {
self.pool.pop().unwrap_or_else(|| BytesMut::with_capacity(self.capacity))
}
fn release(&self, mut buf: BytesMut) {
buf.clear();
let _ = self.pool.push(buf);
}
}
// 使用 object_pool crate 更简洁
static POOL: Lazy<Pool<BytesMut>> = Lazy::new(|| {
Pool::new(1024, || BytesMut::with_capacity(512))
});5. 可观测性增强
现有问题:极简架构缺少监控,问题排查困难。
优化方案:
use metrics::{counter, gauge, histogram};
// 关键指标
gauge!("ws.connections.active").set(connection_count);
counter!("ws.messages.sent").increment(1);
histogram!("ws.latency.broadcast").record(latency_ms);
counter!("ws.backpressure.drops").increment(dropped_count);
// 分布式追踪(可选)
use tracing::{instrument, info_span};
#[instrument(skip(data))]
async fn broadcast_market_data(symbol: &str, data: Arc<Bytes>) {
let span = info_span!("broadcast", %symbol, size = data.len());
// ...
}6. 优雅降级与熔断
现有问题:评论中提到的高可用问题。
优化方案:
// 本地熔断器
struct CircuitBreaker {
failure_count: AtomicU32,
state: AtomicU8, // Closed, Open, HalfOpen
last_failure: AtomicU64,
}
impl CircuitBreaker {
async fn call<F, T, E>(&self, f: F) -> Result<T, E>
where
F: Future<Output = Result<T, E>>,
{
if self.is_open() {
return Err(/* circuit open error */);
}
match f.await {
Ok(v) => {
self.record_success();
Ok(v)
}
Err(e) => {
self.record_failure();
Err(e)
}
}
}
}架构对比
| 维度 | 原方案 | 优化后 |
|---|---|---|
| 带宽 | 100MB/s | ~30MB/s(二进制+增量) |
| Task 数 | 10 万 | 几百(分组广播) |
| 背压策略 | 全部丢弃 | 分级降级 |
| 内存分配 | 每消息分配 | 池化复用 |
| 可观测性 | 无 | 完整指标 |
| 故障恢复 | 重启丢失 | 熔断+快速恢复 |
何时需要 Redis/Kafka?
原文的”告别 Redis/Kafka”并非适用于所有场景:
| 场景 | 是否需要 |
|---|---|
| 单机 10 万连接、数据可丢失 | 不需要 |
| 多节点部署 | 需要(跨节点通信) |
| 数据持久化要求 | 需要 Kafka |
| 消息回溯需求 | 需要 Kafka |
| 缓存共享(Session 等) | 需要 Redis |
总结
原作者的架构思路非常值得学习:
- 按需推送是最大的优化——不发送不需要的数据
- CDN 边缘缓存对列表类数据效果显著
- Rust + Tokio 的性能天花板足够高
- 简单就是美——没有外部依赖意味着更少的故障点
但在生产环境中,建议补充:
- 二进制协议减少带宽
- 分级背压保证用户体验
- 分组广播减少 Task 数量
- 完善的监控和告警
- 优雅的故障降级策略
最后,技术选型没有银弹,适合自己业务场景的才是最好的。
参考链接:
单机 Rust 架构处理 10 万+ 在线用户:深度分析与优化建议
https://wsafight.github.io/personBlog/posts/rust-perf/