netty粘包和拆包原理(Netty提供的粘包拆包解决方案-LengthField)
LengthField方法就是在生成的数据包中添加一个长度字段,用于记录当前数据包的长度。主要用到的是LengthFieldBasedFrameDecoder和LengthFieldPrepender两个类。
LengthFieldBasedFrameDecoder和LengthFieldPrepender是一对,LengthFieldBasedFrameDecoder是解码器,LengthFieldPrepender是编码器。
解码器会按照参数指定的包长度偏移量数据对接收到的数据进行解码,从而得到目标消息体数据;而编码器则会在相应的数据前面添加指定的字节数据,这个字节数据中保存了当前消息体的整体字节数据长度。
解码器LengthFieldBasedFrameDecoder构造方法(常用)有5个参数:
1、maxFrameLength:定义了每个包所能传递的最大数据包大小。
2、lengthFieldOffset:定义了长度字段在字节码中的偏移量。
3、lengthFieldLength:定义了长度字段所占用的字节长度。
4、lengthAdjustment:对一些不仅包含有消息头和消息体的数据进行消息头的长度的调整,这样就可以只得到消息体的数据,这里的lengthAdjustment指定的就是消息头的长度。
5、initialBytesToStrip:忽略掉消息头以及长度字段占用的字节。对于长度字段在消息头中间的情况,可以通过这个参数来忽略掉。
编码器LengthFieldPrepender构造方法(常用)有1个参数:
1、lengthFieldLength:定义了长度字段所占用的字节长度。只允许使用1、2、3、4和8。
下面就用一个简单的例子来演示下。
客户端连接成功服务端后,直接发送个消息给服务端,服务端收到消息,打印下就直接将消息返回给客户端。
首先定义了消息体,包含有消息头和具体的内容:
@Data
@Builder
@ToString
public class Message implements Serializable {
private static final long serialVersionUID = -8714655485204643561L;
private MessageHead head;
private String content;
/**
* 消息头
*
*/
@Data
@Builder
@ToString
public static class MessageHead implements Serializable {
private static final long serialVersionUID = 582562216717718193L;
/**
* 协议版本号
*/
private int version;
/**
* 协议号
*/
private int cmd;
/**
* 内容长度
*/
private int contentLength;
}
}
下来定义测试服务端,监控端口3333:
@Slf4j
public class ServerTest {
public static void main(String[] args) {
start(3333);
}
private static void start(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
//
.channel(NioServerSocketChannel.class)
//
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//
.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2))
//
.addLast(new LengthFieldPrepender(2))
//
.addLast(new MessageEncoder())
//
.addLast(new MessageDecoder())
//
.addLast(new ServerHandler());
}
});
ChannelFuture f = b.bind(port).sync();
f.addListener(future -> {
log.info("===============================================");
log.info("====== Start at port:" port " ===");
log.info("===============================================");
});
f.channel().closeFuture().sync();
} catch (Exception e) {
log.error("server exception stop:", e);
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
private static class ServerHandler extends SimpleChannelInboundHandler<Message> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
System.out.println("server: " msg.toString());
ctx.channel().writeAndFlush(msg);
}
}
}
定义客户端,
@Slf4j
public class ClientTest {
public static void main(String[] args) {
start("127.0.0.1", 3333);
}
private static void start(String host, int port) {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup)
//
.channel(NioSocketChannel.class)
//
.option(ChannelOption.SO_KEEPALIVE, true)
//
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//
.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2))
//
.addLast(new LengthFieldPrepender(2))
//
.addLast(new MessageEncoder())
//
.addLast(new MessageDecoder())
//
.addLast(new ClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.addListener(future -> log.info("[success] Connect at {}:{}", host, port));
f.channel().closeFuture().sync();
} catch (Exception e) {
log.warn(e.toString());
} finally {
workerGroup.shutdownGracefully();
}
}
private static class ClientHandler extends SimpleChannelInboundHandler<Message> {
@Override
public void channelActive(final ChannelHandlerContext ctx) throws InterruptedException {
String content = "test content";
Message msg = Message.builder()
.head(MessageHead.builder().cmd(1000).version(1).contentLength(content.length()).build())
.content(content).build();
ctx.channel().writeAndFlush(msg);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
System.out.println("client: " msg.toString());
}
}
}
最后,定义消息体编解码器:
// 编码器
@Slf4j
public class MessageEncoder extends MessageToByteEncoder<Message> {
public MessageEncoder() {
super();
}
@Override
protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) throws Exception {
if (null == message) {
log.info("Message is null!! [{}]", ctx.channel().remoteAddress());
return;
}
ByteBuf buf = Unpooled.buffer();
MessageHead header = message.getHead();
// 这里写入的顺序就是协议的顺序.
// 写入Header信息
buf.writeInt(header.getVersion());
buf.writeInt(header.getCmd());
buf.writeInt(message.getContent().length());
// 写入消息主体信息
buf.writeBytes(message.getContent().getBytes());
ctx.writeAndFlush(Unpooled.wrappedBuffer(buf));
}
}
// 解码器
@Slf4j
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
public MessageDecoder() {
super();
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int canReadByteLen = in.readableBytes();
if (canReadByteLen <= 0) {
return;
}
// 获取协议的版本
int version = in.readInt();
// 获取协议号
int cmd = in.readInt();
// 获取消息内容长度
int contentLength = in.readInt();
// 协议头
MessageHead header = MessageHead.builder().version(version).cmd(cmd).contentLength(contentLength).build();
// 读取消息内容
byte[] contentBytes = new byte[contentLength];
in.readBytes(contentBytes);
String content = new String(contentBytes);
Message message = Message.builder().head(header).content(content).build();
out.add(message);
}
}
运行测试,先运行服务端,再运行客户端。
服务端运行结果:
客户端运行结果:
,
免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com