上篇讲了RPC服务端的实现。原理就是解析netty通道数据拿到类、方法及入参等信息,然后通过java反射机制调用本地接口返回结果。没有用到很复杂的技术。

这篇我们将客户端的实现。说白了客户端的任务很简单:一是建立socket长连接。二是封装发送服务端需要的数据包。三是处理返回结果。

demo地址

https://gitee.com/syher/grave-netty

RPC实现

同样定义注解扫描service接口。

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import({NettyClientScannerRegistrar.class, NettyClientApplicationContextAware.class})
public @interface NettyClientScan {

    String[] basePackages();

    Class<? extends NettyFactoryBean> factoryBean() default NettyFactoryBean.class;
}

  

该注解用于spring boot启动类上,参数basePackages指定接口所在的包路径。

@SpringBootApplication
@NettyClientScan(basePackages = {
        "com.braska.grave.netty.api.service"
})
public class GraveNettyClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(GraveNettyClientApplication.class, args);
    }

}

  

NettyServerScannerRegistrar类注册bean。

public class NettyClientScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        // spring bean注册
        NettyClientInterfaceScanner scanner = new NettyClientInterfaceScanner(registry);

        AnnotationAttributes annoAttrs =
                AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(NettyClientScan.class.getName()));

        Class<? extends NettyFactoryBean> nettyFactoryBeanClass = annoAttrs.getClass("factoryBean");
        if (!NettyFactoryBean.class.equals(nettyFactoryBeanClass)) {
            scanner.setNettyFactoryBean(BeanUtils.instantiateClass(nettyFactoryBeanClass));
        }

        List<String> basePackages = new ArrayList<String>();
        for (String pkg : annoAttrs.getStringArray("basePackages")) {
            if (StringUtils.hasText(pkg)) {
                basePackages.add(pkg);
            }
        }

        scanner.doScan(StringUtils.toStringArray(basePackages));
    }
}

  

NettyClientInterfaceScanner类使用jdk动态代理basePackages路径下的接口。

public class NettyClientInterfaceScanner extends ClassPathBeanDefinitionScanner {
    private NettyFactoryBean nettyFactoryBean = new NettyFactoryBean();


    @Override
    public Set<BeanDefinitionHolder> doScan(String... basePackages) {
        Set<BeanDefinitionHolder> beanDefinitions = super.doScan(basePackages);

        if (beanDefinitions.isEmpty()) {
        } else {
            processBeanDefinitions(beanDefinitions);
        }

        return beanDefinitions;
    }

    private void processBeanDefinitions(
            Set<BeanDefinitionHolder> beanDefinitions) {

        GenericBeanDefinition definition;

        for (BeanDefinitionHolder holder : beanDefinitions) {

            definition = (GenericBeanDefinition) holder.getBeanDefinition();
            // 为对象属性赋值(这一块我也还不太明白)
       definition.getConstructorArgumentValues().addGenericArgumentValue(definition.getBeanClassName());
            // 这里的nettyFactoryBean是生成Bean实例的工厂,不是Bean本身
            definition.setBeanClass(this.nettyFactoryBean.getClass());

            definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
        }
    }
}

  

NettyFactoryBean 

public class NettyFactoryBean<T> implements FactoryBean<T> {
    private Class<T> nettyInterface;

    public NettyFactoryBean() {}

    public NettyFactoryBean(Class<T> nettyInterface) {
        this.nettyInterface = nettyInterface;
    }

    @Override
    public T getObject() throws Exception {
        // 通过jdk动态代理创建实例
        return (T) Proxy.newProxyInstance(nettyInterface.getClassLoader(), new Class[]{nettyInterface}, c.getInstance());
    }

    @Override
    public Class<?> getObjectType() {
        return this.nettyInterface;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }
}

  

关键来了,NettyInterfaceInvoker类负责数据包封装及发送。

public class NettyInterfaceInvoker implements InvocationHandler {

    private RequestSender sender;
    // 静态内部类做单例模式 
    private static class SINGLETON {
        private static final NettyInterfaceInvoker invoker = new NettyInterfaceInvoker();

        private static NettyInterfaceInvoker setSender(RequestSender sender) {
            invoker.sender = sender;
            return invoker;
        }
    }

    public static NettyInterfaceInvoker getInstance() {
        return SINGLETON.invoker;
    }

    public static NettyInterfaceInvoker setSender(RequestSender sender) {
        return SINGLETON.setSender(sender);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 数据包封装,包含类名、方法名及参数等信息。
        Request request = new Request();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameters(args);
        request.setParameterTypes(method.getParameterTypes());
        request.setId(UUID.randomUUID().toString());
        // 数据发送
        Object result = sender.send(request);
        Class<?> returnType = method.getReturnType();
        // 处理返回数据
        Response response = JSON.parseObject(result.toString(), Response.class);
        if (response.getCode() == 1) {
            throw new Exception(response.getError());
        }
        if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)) {
            return response.getData();
        } else if (Collection.class.isAssignableFrom(returnType)) {
            return JSONArray.parseArray(response.getData().toString(), Object.class);
        } else if (Map.class.isAssignableFrom(returnType)) {
            return JSON.parseObject(response.getData().toString(), Map.class);
        } else {
            Object data = response.getData();
            return JSONObject.parseObject(data.toString(), returnType);
        }
    }
}

  

接着我们来看看RequestSender怎么处理数据的。

public interface RequestSender {
    Channel connect(SocketAddress address) throws InterruptedException;

    Object send(Request request) throws InterruptedException;
}

  

RequestSender本身只是一个接口。他的实现类有:

public class NettyClientApplicationContextAware extends ChannelInitializer<SocketChannel>
        implements RequestSender, ApplicationContextAware, InitializingBean {
    private static final Logger logger = Logger.getLogger(NettyClientApplicationContextAware.class.getName());

    private String remoteAddress;
    private Bootstrap bootstrap;
    private EventLoopGroup group;
    private NettyChannelManager manager;
    private NettyClientHandler handler;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.remoteAddress = applicationContext.getEnvironment().getProperty("remoteAddress");
        this.bootstrap = new Bootstrap();
        this.group = new NioEventLoopGroup(1);
        this.bootstrap.group(group).
                channel(NioSocketChannel.class).
                option(ChannelOption.TCP_NODELAY, true).
                option(ChannelOption.SO_KEEPALIVE, true).
                handler(this);
        this.manager = new NettyChannelManager(this);
        this.handler = new NettyClientHandler(manager, remoteAddress);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // socket连接入口。
        this.manager.refresh(Lists.newArrayList(remoteAddress));
    }

    @Override
    public Object send(Request request) throws InterruptedException {
        Channel channel = manager.take();
        if (channel != null && channel.isActive()) {
            SynchronousQueue<Object> queue = this.handler.sendRequest(request, channel);
            Object result = queue.take();
            return JSONArray.toJSONString(result);
        } else {
            Response res = new Response();
            res.setCode(1);
            res.setError("未正确连接到服务器.请检查相关配置信息!");
            return JSONArray.toJSONString(res);
        }
    }

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new IdleStateHandler(0, 0, 30));
        pipeline.addLast(new JSONEncoder());
        pipeline.addLast(new JSONDecoder());
        // 管道处理器
        pipeline.addLast(this.handler);
    }

    @Override
    public Channel connect(SocketAddress address) throws InterruptedException {
        ChannelFuture future = bootstrap.connect(address);
        // 建立长连接,提供失败重连。
        future.addListener(new ConnectionListener(this.manager, this.remoteAddress));
        Channel channel = future.channel();//future.sync().channel();
        return channel;
    }

    public void destroy() {
        this.group.shutdownGracefully();
    }
}

  

NettyClientHandler类处理管道事件。与服务端不通,这个管道处理器是继承ChannelInboundHandlerAdapter类。

@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());

    private ConcurrentHashMap<String, SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();
    private NettyChannelManager manager;
    private String remoteAddress;

    public NettyClientHandler(NettyChannelManager manager, String remoteAddress) {
        this.manager = manager;
        this.remoteAddress = remoteAddress;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
        logger.info("与netty服务器断开连接." + address);
        ctx.channel().close();
        manager.remove(ctx.channel());
        // 掉线重连
        final EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(() -> {
            manager.refresh(Lists.newArrayList(remoteAddress));
        }, 1L, TimeUnit.SECONDS);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 处理服务端返回的数据
        Response response = JSON.parseObject(msg.toString(), Response.class);
        String requestId = response.getRequestId();
        SynchronousQueue<Object> queue = queueMap.get(requestId);
        queue.put(response);
        queueMap.remove(requestId);
    }

    public SynchronousQueue<Object> sendRequest(Request request, Channel channel) {
        // 使用阻塞队列处理客户端请求
        SynchronousQueue<Object> queue = new SynchronousQueue<>();
        queueMap.put(request.getId(), queue);
        channel.writeAndFlush(request);
        return queue;
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        logger.info("发送心跳消息...");
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.ALL_IDLE) {
                Request request = new Request();
                request.setMethodName("heartBeat");
                ctx.channel().writeAndFlush(request);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

  

这样,RPC的客户端就写好了,其中主要涉及到的关键内容就是netty实例及管道处理器、jdk动态代理、还有一个阻塞队列。

结合上篇RPC服务端。一个完整的RPC框架就搭建完了。

当然,有些地方处理的还是比较粗糙。后续有修改以git代码为准。

版权声明:本文为braska原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/braska/p/12759062.html