|
Google的Protobuf在業(yè)界非常流行,很多商業(yè)項目都選擇Protobuf作為編解碼框架,以下為Protobuf的一些優(yōu)點:
(1)在谷歌內(nèi)長期使用,產(chǎn)品成熟度高。
(2)跨語言,支持包括C++、Java、Python在內(nèi)的多重語言。
(3)編碼后的碼流小,便于存儲和傳輸。
(4)編解碼性能高。
(5)支持不同協(xié)議向前兼容。
(6)支持定義可選和必選字段。
一、Protobuf開發(fā)環(huán)境搭建
1、下載Protobuf的Windows版,網(wǎng)址如下:https://developers.google.com/protocol-buffers/docs/downloads?hl=zh-cn,本示例基于protoc-2.6.1-win32.zip
2、下載Protobuf Java語言所需的jar包,網(wǎng)址如下:http://repo2./maven2/com/google/protobuf/protobuf-java/2.6.1/,本示例基于protobuf-java-2.6.1.jar。
3、新建請求響應(yīng)所需的proto文件
SubscribeReq.proto
- package netty;
- option java_package = "com.serial.java.protobuf";
- option java_outer_classname = "SubscribeReqProto";
-
- message SubscribeReq{
- required int32 subReqID = 1;
- required string userName = 2;
- required string productName = 3;
- repeated string address = 4;
- }
SubscribeRespProto.proto
- package netty;
- option java_package = "com.serial.java.protobuf";
- option java_outer_classname = "SubscribeRespProto";
- message SubscribeResp{
- required int32 subReqID = 1;
- required string respCode = 2;
- required string desc = 3;
- }
4、通過Protoc.exe生成所需的Java編解碼POJO文件,命令行如下。
- C:\Users\Administrator>d:
- D:\>cd "Program Files\protoc-2.6.1-win32"
- D:\Program Files\protoc-2.6.1-win32>protoc.exe --java_out=.\src .\netty\Subscrib
- eReq.proto
- D:\Program Files\protoc-2.6.1-win32>protoc.exe --java_out=.\src .\netty\Subscrib
- eResp.proto
- D:\Program Files\protoc-2.6.1-win32>
5、將生成的Java POJO文件拷貝到項目中,注意Protobuf所需的jar包也需包含在項目中,不然會報錯。
6、創(chuàng)建測試類,測試Protobuf的編解碼功能。
TestSubscribeReq.java
- package com.serial.java.test;
-
- import java.util.ArrayList;
- import java.util.List;
-
- import com.google.protobuf.InvalidProtocolBufferException;
- import com.serial.java.protobuf.SubscribeReqProto;
-
- public class TestSubscribeReq {
-
- private static byte [] encode(SubscribeReqProto.SubscribeReq req){
- return req.toByteArray();
- }
-
- private static SubscribeReqProto.SubscribeReq decode(byte [] body)
- throws InvalidProtocolBufferException{
- return SubscribeReqProto.SubscribeReq.parseFrom(body);
- }
-
- private static SubscribeReqProto.SubscribeReq createSubscribeReq(){
- SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
- builder.setSubReqID(1);
- builder.setUserName("leeka");
- builder.setProductName("Netty book");
-
- List<String> address = new ArrayList<String>();
- address.add("Nanjing");
- address.add("Beijing");
- address.add("Hangzhou");
- builder.addAllAddress(address);
- return builder.build();
- }
-
-
- public static void main(String[] args)throws Exception {
- SubscribeReqProto.SubscribeReq req = createSubscribeReq();
- System.out.println("before encode:"+ req.toString());
- SubscribeReqProto.SubscribeReq req2 = decode(encode(req));
- System.out.println("after encode:"+ req2.toString());
- System.out.println("Assert equal: " + req2.equals(req));
-
- }
-
- }
7、運行測試類,查看測試結(jié)果,控制臺輸出如下信息:
- before encode:subReqID: 1
- userName: "leeka"
- productName: "Netty book"
- address: "Nanjing"
- address: "Beijing"
- address: "Hangzhou"
-
- after encode:subReqID: 1
- userName: "leeka"
- productName: "Netty book"
- address: "Nanjing"
- address: "Beijing"
- address: "Hangzhou"
-
- Assert equal: true
二、Netty的Protobuf服務(wù)端和客戶端開發(fā)
服務(wù)端入口
- package com.serial.java;
-
- import com.serial.java.protobuf.SubscribeReqProto;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.protobuf.ProtobufDecoder;
- import io.netty.handler.codec.protobuf.ProtobufEncoder;
- import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
- import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
-
-
- public class SubReqServer {
-
- public void bind(int port)throws Exception{
-
- //配置服務(wù)端NIO線程組
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try{
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<SocketChannel>() {
-
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new ProtobufVarint32FrameDecoder())
- .addLast(new ProtobufDecoder(
- SubscribeReqProto.SubscribeReq.getDefaultInstance()))
- .addLast(new ProtobufVarint32LengthFieldPrepender())
- .addLast(new ProtobufEncoder())
- .addLast(new SubReqServerHandler());
- }
-
- });
- //綁定端口,同步等待成功
- ChannelFuture f = b.bind(port).sync();
- //等待服務(wù)端監(jiān)聽端口關(guān)閉
- f.channel().closeFuture().sync();
-
- }finally{
- //退出時釋放資源
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
-
- public static void main(String[] args) throws Exception{
- int port = 8085;
- if(args!=null && args.length > 0){
- port = Integer.valueOf(args[0]);
- }
- new SubReqServer().bind(port);
- }
- }
服務(wù)端處理類
- package com.serial.java;
-
- import com.serial.java.protobuf.SubscribeReqProto;
- import com.serial.java.protobuf.SubscribeRespProto;
-
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
-
- public class SubReqServerHandler extends ChannelHandlerAdapter {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg;
- //System.out.println("SubReqServerHandler channelRead:"+ req.getUserName());
- if("leeka".equalsIgnoreCase(req.getUserName())){
- System.out.println("service accept client subscribe req:["+ req +"]");
- ctx.writeAndFlush(resp(req.getSubReqID()));
- }
- }
-
- private SubscribeRespProto.SubscribeResp resp(int subReqID){
- SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
- builder.setSubReqID(subReqID);
- builder.setRespCode("0");
- builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
- return builder.build();
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
-
- }
客戶端入口
- package com.serial.java;
-
- import com.serial.java.protobuf.SubscribeRespProto;
-
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.protobuf.ProtobufDecoder;
- import io.netty.handler.codec.protobuf.ProtobufEncoder;
- import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
- import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
-
- public class SubReqClient {
-
- public void connect(int port,String host)throws Exception{
-
- //配置客戶端NIO線程組
- EventLoopGroup group = new NioEventLoopGroup();
-
- try{
- Bootstrap b = new Bootstrap();
- b.group(group).channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new ProtobufVarint32FrameDecoder())
- .addLast(new ProtobufDecoder(
- SubscribeRespProto.SubscribeResp.getDefaultInstance()))
- .addLast(new ProtobufVarint32LengthFieldPrepender())
- .addLast(new ProtobufEncoder())
- .addLast(new SubReqClientHandler());
- };
- });
-
- //發(fā)起異步連接操作
- ChannelFuture f = b.connect(host,port).sync();
- //等待客戶端鏈路關(guān)閉
- f.channel().closeFuture().sync();
- }finally{
- //退出,釋放資源
- group.shutdownGracefully();
- }
-
- }
-
- public static void main(String[] args)throws Exception {
- int port = 8085;
- if(args!=null && args.length > 0){
- port = Integer.valueOf(args[0]);
- }
- new SubReqClient().connect(port, "127.0.0.1");
- }
- }
客戶端處理類
- package com.serial.java;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.logging.Logger;
-
- import com.serial.java.protobuf.SubscribeReqProto;
-
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
-
- public class SubReqClientHandler extends ChannelHandlerAdapter {
-
- private static final Logger logger = Logger.getLogger(SubReqClientHandler.class.getName());
-
- public SubReqClientHandler() {
-
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 0; i < 10; i++) {
- ctx.write(req(i));
- }
- ctx.flush();
- }
-
- private SubscribeReqProto.SubscribeReq req(int i){
- SubscribeReqProto.SubscribeReq.Builder r = SubscribeReqProto.SubscribeReq.newBuilder();
- r.setSubReqID(i);
- r.setProductName("Netty Book"+i);
- r.setUserName("leeka");
-
- List<String> address = new ArrayList<String>();
- address.add("Nanjing");
- address.add("Beijing");
- r.addAllAddress(address);
- return r.build();
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- //super.channelReadComplete(ctx);
- ctx.flush();
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- System.out.println("receive server response:["+msg+"]");
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- logger.warning("unexpected exception from downstream:"+ cause.getMessage());
- ctx.close();
- }
-
- }
OVER
|