一、是什么?
客戶端通過socket給服務端發(fā)送數(shù)據(jù),為了傳輸更有效率,會將多次間隔較小的且數(shù)據(jù)量小的數(shù)據(jù),通過nagle算法,合并成一個大的數(shù)據(jù)塊,然后進行封包。這樣做提高了效率,缺點就是你發(fā)送到服務端的數(shù)據(jù),服務端不知道是不是完整的,不知道哪幾小塊數(shù)據(jù)拼起來才是原來的數(shù)據(jù)。舉個例子:客戶端要發(fā)送原信息是A和B兩個數(shù)據(jù)包,服務端接收到之后,可能出現(xiàn)如下情況:
- 正常情況:讀取到了A和B兩個數(shù)據(jù)包;
- 粘包:A和B兩個數(shù)據(jù)包一起讀取了;
- 拆包:讀取了A數(shù)據(jù)包的一部分,A的另一部分和B數(shù)據(jù)包一起讀取了;
由于TCP是沒有消息保護邊界的,也就是上面的消息,沒有邊界,服務端并不知道hello的o是一個邊界,hello是一個單詞,所以我們就得中服務端處理邊界問題。這也就是粘包拆包問題。
二、Netty中粘拆包如何解決
- 使用自定義協(xié)議 + 編解碼器來解決。說人話就是:服務端你不是不知道消息的長度嗎?那我就讓客戶端發(fā)送的消息封裝成一個對象,對象包括消息長度和消息內(nèi)容,服務端讀取的時候通過對象就可以拿到每次讀取的長度了。
下面看具體案例:
- 封裝消息對象 MessageProtocol.java:
@Data
public class MessageProtocol {
private int len; // 長度
private byte[] content; // 發(fā)送的內(nèi)容
}
public class MessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MessageDecoder.decode 被調(diào)用");
// 將byte轉成MessageProtocol對象
MessageProtocol msg = new MessageProtocol();
int len = in.readInt();
byte[] content = new byte[len];
in.readBytes(content);
msg.setContent(content);
msg.setLen(len);
// 放入到out中傳遞給下一個handler處理
out.add(msg);
}
}
public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MessageEncoder.encode被調(diào)用");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
- 客戶端 --- NettyClient.java:
public class NettyClient {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建事件循環(huán)組
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 2. 創(chuàng)建啟動對象
Bootstrap bootstrap = new Bootstrap();
// 3. 設置相關參數(shù)
bootstrap.group(eventLoopGroup) // 設置線程組
.channel(NioSocketChannel.class) // 設置通道
.handler(new NettyClientInitializer());
// 4. 連接服務端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
// 5. 監(jiān)聽通道關閉
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
- 客戶端 --- NettyClientInitializer.java:
public class NettyClientInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new NettyClientHandler());
}
}
- 客戶端 --- NettyClientHandler.java:
public class NettyClientHandler extends SimpleChannelInboundHandler<MessageProtocol>{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 發(fā)送10條數(shù)據(jù)
for (int i=0; i<5; i++) {
String msg = "hello " + i;
byte[] bys = msg.getBytes("utf-8");
int len = msg.getBytes("utf-8").length;
// 創(chuàng)建協(xié)議包
MessageProtocol message = new MessageProtocol();
message.setLen(len);
message.setContent(bys);
// 發(fā)送
ctx.writeAndFlush(message);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ch, MessageProtocol msg) throws Exception {
int len = msg.getLen();
byte[] bys = msg.getContent();
System.out.println("客戶端收到消息:長度 = " + len + ", 內(nèi)容 = " + new String(bys, Charset.forName("utf-8")));
}
}
public class NettyServer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建boss group (boss group和work group含有的子線程數(shù)默認是cpu數(shù) * 2)
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 2. 創(chuàng)建work group
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 3. 創(chuàng)建服務端啟動對象
ServerBootstrap bootstrap = new ServerBootstrap();
// 4. 配置啟動參數(shù)
bootstrap.group(bossGroup, workGroup) // 設置兩個線程組
.channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作為服務器的通道
.childHandler(new NettyServerInitializer());
// 5. 啟動服務器并綁定端口
ChannelFuture cf = bootstrap.bind(6666).sync();
// 6. 對關閉通道進行監(jiān)聽
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
- 服務端 NettyServerInitializer.java:
public class NettyServerInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new MessageDecoder());
sc.pipeline().addLast(new MessageEncoder());
sc.pipeline().addLast(new NettyServerHandler());
}
}
- 服務端 NettyServerHandler.java:
public class NettyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
// 接收數(shù)據(jù)并處理
int len = msg.getLen();
byte[] bys = msg.getContent();
System.out.println("服務端第" + (++count) + "次收到消息:長度 = " + len + ", 內(nèi)容 = " + new String(bys, Charset.forName("utf-8")));
// 給客戶端回復消息
String responseContent = UUID.randomUUID().toString();
byte[] rbys = responseContent.getBytes("utf-8");
int rlen = responseContent.getBytes("utf-8").length;
MessageProtocol rmsg = new MessageProtocol();
rmsg.setContent(rbys);
rmsg.setLen(rlen);
ctx.writeAndFlush(rmsg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
}
怎么樣,你學廢了嗎?