Netty 核心架构与原理
hardNettyNIOEventLoopPipelineChannelHandlerByteBuf
Netty 是 Java 生态中最流行的网络通信框架,封装了 NIO 的复杂性,提供了 Pipeline 编程模型。理解 Netty,就理解了 Dubbo、RocketMQ、Elasticsearch、Zookeeper 等几乎所有高性能中间件的网络层。
为什么需要 Netty?
直接使用原生 NIO 的痛点:
| 问题 | 说明 |
|---|---|
| API 复杂 | Selector、SelectionKey、Buffer 状态管理容易出错 |
| Bug 多 | 空轮询 Bug(JDK NIO 经典 Bug),需要 Workaround |
| 粘包/拆包 | TCP 是流式协议,需要自己处理消息边界 |
| 编解码 | 字节 ↔ 对象的转换需要自己实现 |
| 异常处理 | 各种网络异常需要妥善处理,否则内存泄漏 |
Netty 的价值:把这些都封装好了,只需关注业务逻辑。
Netty 核心架构图
┌──────────────────────────────────────────────────────┐
│ Netty Server │
│ │
│ ┌─────────────┐ ┌──────────────────────────┐ │
│ │ BossGroup │ │ WorkerGroup │ │
│ │ EventLoop │ │ EventLoop EventLoop │ │
│ │ (accept) │───连接►│ (read/write/业务) │ │
│ └─────────────┘ └──────────────────────────┘ │
│ │ │
│ Channel Pipeline │
│ ┌───────────────────────┐ │
│ │ ChannelHandler 链 │ │
│ │ Decoder → Handler → │ │
│ │ Encoder │ │
│ └───────────────────────┘ │
└──────────────────────────────────────────────────────┘
EventLoop:事件循环
核心思想
EventLoop 是 Netty 的核心线程模型:
- 一个 EventLoop 对应一个线程,一个线程独自负责一个或多个 Channel
- EventLoop 持续循环:等待 I/O 事件 → 处理 I/O 事件 → 执行队列中的任务
- Channel 与 EventLoop 绑定,Channel 的所有 I/O 操作在同一线程内完成,避免同步问题
// 伪代码:EventLoop 的核心循环
while (!terminated) {
// 1. 等待就绪事件(epoll_wait)
select(timeout);
// 2. 处理就绪的 I/O 事件
processSelectedKeys();
// 3. 执行队列中的任务(如 inbound/outbound 数据处理)
runAllTasks();
}
EventLoopGroup
// BossGroup:接收连接(通常 1-2 个线程)
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// WorkerGroup:处理 I/O 和业务(默认:CPU 核数 × 2)
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4),
new MyBusinessHandler()
);
}
});
ChannelPipeline:责任链
Pipeline 是 Netty 处理消息的核心机制,基于责任链模式:
数据入站(读): Head → Handler1 → Handler2 → Handler3 → Tail
数据出站(写): Tail → Handler3 → Handler2 → Handler1 → Head
每个 ChannelHandler 只专注一件事,通过 pipeline().addLast() 串联:
pipeline.addLast(new HttpServerCodec()); // HTTP 编解码
pipeline.addLast(new HttpObjectAggregator(65536)); // 聚合完整 HTTP 请求
pipeline.addLast(new ChunkedWriteHandler()); // 支持大文件传输
pipeline.addLast(new MyHttpHandler()); // 业务逻辑
Inbound vs Outbound Handler
| 类型 | 接口/基类 | 触发时机 |
|---|---|---|
| Inbound | ChannelInboundHandlerAdapter |
读数据(网络 → 应用) |
| Outbound | ChannelOutboundHandlerAdapter |
写数据(应用 → 网络) |
| Both | ChannelDuplexHandler |
双向 |
// 典型 Inbound Handler
public class MyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
try {
// 处理收到的数据
System.out.println("Received: " + buf.toString(CharsetUtil.UTF_8));
// 回写响应
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello!", CharsetUtil.UTF_8));
} finally {
buf.release(); // ⚠️ 必须释放,防止内存泄漏
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
ByteBuf:Netty 的 Buffer
ByteBuf 是 Netty 替代 JDK ByteBuffer 的实现,解决了 ByteBuffer 的痛点:
| 对比 | ByteBuffer(JDK) | ByteBuf(Netty) |
|---|---|---|
| 读写模式切换 | 需要 flip() | 两个独立指针,不需要切换 |
| 扩容 | 不支持 | 自动扩容 |
| 内存管理 | GC 管理 | 引用计数,支持池化 |
| 复合 Buffer | 不支持 | CompositeByteBuf |
两个指针,读写分离
ByteBuf 内部结构:
│ 已废弃 │ 可读区 │ 可写区 │ 额外容量 │
│ │ │ │ │
0 readerIndex writerIndex capacity maxCapacity
ByteBuf buf = Unpooled.buffer(256); // 堆内非池化
// 写入
buf.writeBytes("Hello".getBytes());
buf.writeInt(42);
// 读取(不需要 flip!)
byte[] bytes = new byte[5];
buf.readBytes(bytes); // 读 5 字节
int n = buf.readInt(); // 读 int
内存类型
ByteBuf 类型:
├── 堆内存(HeapByteBuf):byte[],无 GC 外压,适合短生命周期
├── 直接内存(DirectByteBuf):OS 内存,I/O 操作更快,需要手动释放
└── 复合(CompositeByteBuf):多个 ByteBuf 逻辑合并,无内存拷贝
引用计数与内存泄漏
ByteBuf buf = ctx.alloc().buffer(); // 引用计数 = 1
buf.retain(); // 引用计数 = 2
buf.release(); // 引用计数 = 1
buf.release(); // 引用计数 = 0,内存释放
// ⚠️ 常见错误:pipeline 中间 handler 没有释放
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
// 如果不传递给下一个 handler,必须手动释放
buf.release(); // 或者 ReferenceCountUtil.release(msg);
}
最佳实践:继承 SimpleChannelInboundHandler<T> 而不是 ChannelInboundHandlerAdapter:
// SimpleChannelInboundHandler 会自动释放 msg
public class MyHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
// 不需要手动 release,框架自动处理
process(msg);
}
}
粘包与拆包
TCP 是流式协议,没有消息边界,需要应用层处理:
发送 3 条消息:│M1│M2│M3│
收到可能是:
粘包:│M1M2│ │M3│
拆包:│M1│ │M2M│ │3│
Netty 内置解码器解决:
// 固定长度
new FixedLengthFrameDecoder(1024)
// 以分隔符结束
new LineBasedFrameDecoder(65536) // \n 或 \r\n
new DelimiterBasedFrameDecoder(65536, Delimiters.lineDelimiter())
// 长度字段协议(最通用:header 里有 length 字段)
new LengthFieldBasedFrameDecoder(
65536, // 最大帧长度
0, // length 字段偏移量
4, // length 字段字节数
0, // 长度调整
4 // 初始剥离字节数(即去掉 length 字段本身)
)
Netty 解决 JDK NIO 的 Epoll Bug
JDK NIO 有个著名的 epoll 空轮询 Bug:select() 在某些情况下不阻塞就返回,导致死循环,CPU 100%。
Netty 的解决方案:计数 + 重建 Selector
// Netty 源码逻辑(简化)
int selectCnt = 0;
while (true) {
int selected = selector.selectNow();
selectCnt++;
if (selected > 0 || hasTasks()) {
selectCnt = 0; // 正常,跑业务
} else if (selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // 默认 512
// 判定为空轮询 Bug,重建 Selector
rebuildSelector();
selectCnt = 0;
}
}
生产环境核心踩坑点
| 问题 | 答案要点 |
|---|---|
| Netty 的线程模型? | BossGroup accept 连接,WorkerGroup 处理 I/O;EventLoop 单线程循环 |
| ChannelPipeline 的原理? | 责任链模式,Inbound 正向,Outbound 反向 |
| ByteBuf 相比 ByteBuffer 的优点? | 读写双指针无需 flip;自动扩容;引用计数池化内存 |
| 如何解决粘包拆包? | LengthFieldBasedFrameDecoder 等内置解码器 |
| Netty 如何解决 JDK NIO 的空轮询 Bug? | 计数检测,超过阈值(512)重建 Selector |
| SimpleChannelInboundHandler 和 ChannelInboundHandlerAdapter 的区别? | 前者自动 release msg,适合终点 Handler;后者需手动管理 |