Netty实现远程调用RPC功能
添加依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.2.Final</version> </dependency> <dependency> <groupId>org.reflections</groupId> <artifactId>reflections</artifactId> <version>0.9.10</version> </dependency>
组织架构
服务端
封装类信息
public class ClassInfo implements Serializable { private static final long serialVersionUID = 1L; private String className; //类名 private String methodName;//方法名 private Class<?>[] types; //参数类型 private Object[] objects;//参数列表 public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<?>[] getTypes() { return types; } public void setTypes(Class<?>[] types) { this.types = types; } public Object[] getObjects() { return objects; } public void setObjects(Object[] objects) { this.objects = objects; } }
服务端网络处理服务器
public class NettyRPCServer { private int port; public NettyRPCServer(int port) { this.port = port; } public void start() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .localAddress(port).childHandler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //编码器 pipeline.addLast("encoder", new ObjectEncoder()); //解码器 pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); //服务器端业务处理类 pipeline.addLast(new InvokeHandler()); } }); ChannelFuture future = serverBootstrap.bind(port).sync(); System.out.println("......server is ready......"); future.channel().closeFuture().sync(); } catch (Exception e) { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new NettyRPCServer(9999).start(); } }
服务器端业务处理类
public class InvokeHandler extends ChannelInboundHandlerAdapter { //得到某接口下某个实现类的名字 private String getImplClassName(ClassInfo classInfo) throws Exception{ //服务方接口和实现类所在的包路径 String interfacePath="com.lyz.server"; int lastDot = classInfo.getClassName().lastIndexOf("."); String interfaceName=classInfo.getClassName().substring(lastDot); Class superClass=Class.forName(interfacePath+interfaceName); Reflections reflections = new Reflections(interfacePath); //得到某接口下的所有实现类 Set<Class> ImplClassSet=reflections.getSubTypesOf(superClass); if(ImplClassSet.size()==0){ System.out.println("未找到实现类"); return null; }else if(ImplClassSet.size()>1){ System.out.println("找到多个实现类,未明确使用哪一个"); return null; }else { //把集合转换为数组 Class[] classes=ImplClassSet.toArray(new Class[0]); return classes[0].getName(); //得到实现类的名字 } } @Override //读取客户端发来的数据并通过反射调用实现类的方法 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ClassInfo classInfo = (ClassInfo) msg; System.out.println(classInfo); Object clazz = Class.forName(getImplClassName(classInfo)).newInstance(); Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes()); //通过反射调用实现类的方法 Object result = method.invoke(clazz, classInfo.getObjects()); ctx.writeAndFlush(result); } }
服务端接口及实现类
// 无参接口 public interface HelloNetty { String hello(); } // 实现类 public class HelloNettyImpl implements HelloNetty { @Override public String hello() { return "hello,netty"; } } // 带参接口 public interface HelloRPC { String hello(String name); } // 实现类 public class HelloRPCImpl implements HelloRPC { @Override public String hello(String name) { return "hello," + name; } }
客户端
代理类
public class NettyRPCProxy { //根据接口创建代理对象 public static Object create(Class target) { return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //封装ClassInfo ClassInfo classInfo = new ClassInfo(); classInfo.setClassName(target.getName()); classInfo.setMethodName(method.getName()); classInfo.setObjects(args); classInfo.setTypes(method.getParameterTypes()); //开始用Netty发送数据 EventLoopGroup group = new NioEventLoopGroup(); ResultHandler resultHandler = new ResultHandler(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //编码器 pipeline.addLast("encoder", new ObjectEncoder()); //解码器 构造方法第一个参数设置二进制数据的最大字节数 第二个参数设置具体使用哪个类解析器 pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); //客户端业务处理类 pipeline.addLast("handler", resultHandler); } }); ChannelFuture future = b.connect("127.0.0.1", 9999).sync(); future.channel().writeAndFlush(classInfo).sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } return resultHandler.getResponse(); } }); } }
客户端业务处理类
public class ResultHandler extends ChannelInboundHandlerAdapter { private Object response; public Object getResponse() { return response; } @Override //读取服务器端返回的数据(远程调用的结果) public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { response = msg; ctx.close(); } }
客户端接口
// 无参接口 public interface HelloNetty { String hello(); } // 带参接口 public interface HelloRPC { String hello(String name); }写一个
测试类 服务调用方
public class TestNettyRPC { public static void main(String [] args){ //第1次远程调用 HelloNetty helloNetty=(HelloNetty) NettyRPCProxy.create(HelloNetty.class); System.out.println(helloNetty.hello()); //第2次远程调用 HelloRPC helloRPC = (HelloRPC) NettyRPCProxy.create(HelloRPC.class); System.out.println(helloRPC.hello("RPC")); } }
输出结果
服务端
......server is ready...... com.lyz.serverStub.ClassInfo@2b894733 com.lyz.serverStub.ClassInfo@167bfa9
客户端
hello,netty hello,RPC
下一篇通过netty实现线上聊天功能
public class NettyRPCProxy {
//根据接口创建代理对象
public static Object create(Class target) {
return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
//封装ClassInfo
ClassInfo classInfo = new ClassInfo();
classInfo.setClassName(target.getName());
classInfo.setMethodName(method.getName());
classInfo.setObjects(args);
classInfo.setTypes(method.getParameterTypes());
//开始用Netty发送数据
EventLoopGroup group = new NioEventLoopGroup();
ResultHandler resultHandler = new ResultHandler();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//编码器
pipeline.addLast("encoder", new ObjectEncoder());
//解码器 构造方法第一个参数设置二进制数据的最大字节数 第二个参数设置具体使用哪个类解析器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
//客户端业务处理类
pipeline.addLast("handler", resultHandler);
}
});
ChannelFuture future = b.connect("127.0.0.1", 9999).sync();
future.channel().writeAndFlush(classInfo).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
return resultHandler.getResponse();
}
});
}
}
版权声明:本文为lyze原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。