Hadoop系列番外篇之一文搞懂Hadoop RPC框架及细节实现
@
Hadoop RPC 框架解析
网络通信模块是分布式系统中最底层的模块。它直接支撑了上层分布式环境下复杂的进程间通信(Inter-Process Communication, IPC)逻辑,是所有分布式系统的基础。远程过程调用(Remote Procedure Call, RPC)是一种常用的分布式网络通信协议。它允许运行于一台计算机的程序调用另一台计算机的子程序,同时将网络的通信细节隐藏起来,使得用户无须额外地为这个交互作用编程。
作为一个分布式系统,Hadoop实现了自己的RPC通信协议,它是上层多个分布式子系统(如MapReduce, HDFS, HBase等)公用的网络通信模块。本文主要从框架设计和实现方面介绍Hadoop RPC,还有该RPC框架在MapReduce中的应用。
1.Hadoop RPC框架概述
1.1 RPC框架特点
RPC实际上是分布式计算中客户机/服务器(Client/Server)模型的一个应用实例。对于Hadoop RPC而言,它具有以下几个特点。
1.透明性:这是所有RPC框架的最根本特征,即当用户在一台计算机的程序调用另外一台计算机上的子程序时,用户自身不应感觉到其间涉及跨机器间的通信,而是感觉像是在执行一个本地调用。
2.高性能:Hadoop各个系统(如HDFS, MapReduce)均采用了Master/Slave结构。其中,Master实际上是一个RPC server,它负责处理集群中所有Slave发送的服务请求。为了保证Master的并发处理能力,RPC server应是一个高性能服务器,能够高效地处理来自多个Client的并发RPC请求。
3.可控性:JDK中已经自带了一个RPC框架——RMI(Remote Method Invocation,远程方法调用)。之所以不直接使用该框架,主要是因为考虑到RPC是Hadoop最底层、最核心的模块之一,保证其轻量级、高性能和可控性显得尤为重要,而RMI过于重量级且用户可控之处太少(如网络连接、超时和缓冲等均难以定制或者修改)。
1.2 Hadoop RPC框架
与其他RPC框架一样,Hadoop RPC主要分为四个部分,分别是序列化层、函数调用层、网络传输层和服务器端处理框架,具体实现机制如下:
序列化层:序列化层的主要作用是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储。在RPC框架中,它主要用于将用户请求中的参数或者应答转化成字节流以便跨机器传输。Hadoop自己实现了序列化框架,一个类只要实现Writable接口,即可支持对象序列化与反序列化。
函数调用层:函数调用层的主要功能是定位要调用的函数并执行该函数。HadoopRPC采用Java反射机制与动态代理实现了函数调用。
网络传输层:网络传输层描述了Client与Server之间消息传输的方式,Hadoop RPC采用了基于TCP/IP的Socket机制。
服务器端处理框架:服务器端处理框架可被抽象为网络I/O模型。它描述了客户端与服务器端间信息交互的方式。它的设计直接决定着服务器端的并发处理能力。常见的网络I/O模型有阻塞式I/O、非阻塞式I/O、事件驱动I/O等,而Hadoop RPC采用了基于Reactor设计模式的事件驱动I/O模型。
Hadoop RPC总体架构自下而上可分为两层。
第一层是一个基于Java NIO(New IO)实现的客户机/服务器(Client/Server)通信模型。其中,客户端将用户的调用方法及其参数封装成请求包后发送到服务器端。服务器端收到请求包后,经解包、调用函数、打包结果等一系列操作后,将结果返回给服务器端。为了增强Server端的扩展性和并发处理能力,Hadoop RPC采用了基于事件驱动的Reactor设计模式,在具体实现时,用到了JDK提供的各种功能包,主要包括java.nio(NIO)、java.lang.reflect(反射机制和动态代理)、java.net(网络编程库)等。
第二层是供更上层程序直接调用的RPC接口,这些接口底层即为客户机/服务器通信模型。
看到这里有些小伙伴说我对于这些Java基础知识都不是很记得了,没关系,暖男的我现在就和大家一起来看看相关的这些Java基础内容。又有一些小伙伴说我没学过啊,那也没关系,我们知识大致去了解一些类和这些类有哪些方法可以帮助我们理解RPC就够了。我们使用Hadoop的时候不也不必关注RPC的细节么,那Java细节不会影响我们使用。对Java基础反射、网络编程和NIO很熟悉的小可爱可以直接跳过第二章节
2.Java基础知识回顾
我们简要介绍Hadoop RPC中用到的JDK开发工具包中的一些类。了解和掌握这些类的功能和使用方法是深入学习Hadoop RPC的基础。这些类主要来自以下三个Java包:java.lang.reflect(反射机制和动态代理相关类)、java.net(网络编程库)和java.nio(NIO)。
2.1 Java反射机制与动态代理
反射机制是Java语言的一个重要特性,它的重要性也不用多说,在很多的框架中,反射撑起了半边天。简言之其作用:允许用户动态获取类的信息和动态调用对象的方法。
我们先来看看它提供的主要的类和类对应的功能:
类名&接口 | 功能描述 |
---|---|
Class | 代表一个Java类 |
Field | 代表Java类的属性 |
Method | 代表Java类的方法 |
Constructor | 代表Java类的构造函数 |
Array | 提供了动态创建数组,以及访问数组元素的静态方法 |
Proxy类以及InvocationHandler接口 | 提供了动态生成代理类以及实例的方法 |
我们重点关注Java动态代理。在动态代理之前,我们先一起回顾一下代理概念及代理模式。有小可爱说不知道动态代理我只听过名字啊,具体是个什么,我不知道呀。没关系,我先简单说一下动态代理的核心思想:是为其他对象提供一种代理以控制对这个对象的访问。代理类负责为委托类进行预处理(如安全检查,权限检查等)或者执行完后的后续处理(如转发给其他代理等)。动态代理的好处就是开发人员通过简单的指定一组接口及委托类对象,便能动态地获得代理类,这大大简化了编写代理类的步骤。
2.1.1 代理关键类&接口信息
在此我们先来了解一下代理的一些关键类&接口以及其主要方法:
1)java. lang.reflect.Proxy
这是Java动态代理机制的主类,它提供了一组静态方法,用于为一组接口动态地生成代理类及其对象。
// Returns the invocation handler for the specified proxy instance.
// Params:proxy – the proxy instance to return the invocation handler for
// Returns:the invocation handler for the proxy instance
// 获取指定代理对象所关联的调用处理器
public static InvocationHandler getInvocationHandler(Object proxy)
// 获取关联于指定类装载器和一组接口的动态代理类的对象
public static Class<?> getProxyClass(ClassLoader loader, Class<?>... interfaces)
// 判断指定的类是不是一个动态代理类
public static boolean isProxyClass(Class<?> cl)
// 为指定类装载器一组接口及调用处理器生成动态代理类实例
public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h)
2)java. lang.reflect.InvocationHandler
这是调用处理器接口。它定义了一个invoke方法,用于处理在动态代理类对象上的方法调用。通常开发人员需实现该接口,并在invoke方法中实现对委托类的代理访问。
// 该方法负责处理动态代理类上的所有方法调用
// 参数:代理类实例,被调用的方法对象,调用参数
// 调用处理器根据这三个参数进行预处理或分派到委托类实例上执行
public Object invoke(Object proxy, Method method, Object[] args)
2.1.2 动态代理创建对象的过程
一个典型的动态代理创建对象的过程可分为以下4个步骤:
- 步骤1 通过实现InvocationHandler接口创建自己的调用处理器:
- 步骤2 通过为Proxy类指定ClassLoader对象和一组interface创建动态代理类:
- 步骤3 通过反射机制获取动态代理类的构造函数,其参数类型是调用处理器接口类型:
- 步骤4 通过构造函数创建动态代理类实例,此时需将调用处理器对象作为参数
被传入:
// step 1通过实现InvocationHandler接口创建自己的调用处理器
InvocationHandler handler=new InvocationHandlerImpl(...);
// 2 通过为Proxy类指定ClassLoader对象和一组interface创建动态代理类
Class clazz=Proxy.getProxyClass(classLoader, new Class[]{……});
// 3 通过反射机制获取动态代理类的构造函数,其参数类型是调用处理器接口类型
Constructor constructor=clazz.getConstructor(new Class[]{InvocationHandler.class});
// 4通过构造函数创建动态代理类实例,此时需将调用处理器对象作为参数
Interface Proxy=(Interface)constructor.newInstance(new Object[]{handler});
Proxy类中的newInstance方法封装了步骤2~步骤4,只需两步即可完成代理对象的创建。
我们通过一个动态代理的例子来加深对于动态代理的理解;
目录结构如下
定义一个接口协议
实现接口协议 Server类
实现调用处理器接口
测试用例
2.2 Java网络编程
通常,Java网络程序建立在TCP/IP协议基础上,致力于实现应用层。传输层向应用层提供了套接字Socket接口,它封装了下层的数据传输细节;应用层的程序可通过Socket与远程主机建立连接和进行数据传输。
JDK提供了3种套接字类:java.net.Socket、java.net.ServerSocket和java.net.DatagramSocket。其中,java.net.Socket和java.net.ServerSocket类建立在TCP协议基础上,而java.net.DatagramSocket类则建立在UDP协议基础上。Java网络程序均采用客户机/服务器通信模式。下面介绍如何使用java.net.Socket和java.net.ServerSocket编写客户端和服务器端程序。
编写一个客户端程序需要以下3个步骤
步骤1 创建客户端Socket:
其中,serverHost为服务器端的host, port为服务器端的监听端口号。一旦Socket创建成功,则表示客户端连接服务器成功。
Socket soc=new Socket(serverHost, port);
步骤2 创建输出、输入流以向服务器端发送数据和从服务器端接收数据:
//构造数据输入流,用以接收数据
DataInputStream in=new DataInputStream(soc.getInputStream());
//构造数据输出流,用以发送数据
DataOutputStream out=new DataOutputStream(soc.getOutputStream());
……
//应用程序发送和接收数据
步骤3 断开连接:
soc.close();
编写一个服务器端程序需要以下4个步骤:
步骤1 创建ServerSocket对象:
ServerSocket serverSocket=new ServerSocket(port);
其中,port为服务器端的监听端口号。当客户端向服务器端建立连接时,需要知道该端口号。创建ServerSocket对象成功后,操作系统将把当前进程注册为服务器进程。
步骤2 监听端口号,等待新连接到达:
Socket soc=serverSocket.accept();
运行函数accept()后,ServerSocket对象会一直处于监听状态,等待客户端的连接请求。一旦有客户端请求到达,该函数会返回一个Socket对象,该Socket对象与客户端Socket对象形成一条通信链路。
步骤3 创建输出、输入流以向客户端发送数据和从客户端接收数据。此处的程序和客户端的一样,故不再赘述。
步骤4 断开连接。此处的程序和客户端的一样,故不再赘述。
在Client/Server模型中,Server往往需要同时处理大量来自Client的访问请求,因此Server端需采用支持高并发访问的架构。一种简单而又直接的解决方案是“one thread-perconnection”。这是一种基于阻塞式I/O的多线程模型,如下图所示。在该模型中,Server为每个Client连接创建一个处理线程,每个处理线程阻塞式等待可能到达的数据,一旦数据到达,则立即处理请求、返回处理结果并再次进入等待状态。由于每个Client连接有一个单独的处理线程为其服务,因此可保证良好的响应时间。但当系统负载增大(并发请求增多)时,Server端需要的线程数会增加,这将成为系统扩展的瓶颈所在。
2.3 Java NIO
2.3.1 简介
从J2SE 1.4版本以来,JDK发布了全新的I/O类库,简称NIO(New IO)。它不但引入了全新的高效的I/O机制,同时引入了基于Reactor设计模式的多路复用异步模式。NIO的包中主要包含了以下几种抽象数据类型:
Channel(通道):NIO把它支持的I/O对象抽象为Channel。它模拟了通信连接,类似于原I/O中的流(Stream),用户可以通过它读取和写入数据。目前已知的实例类有SocketChannel、ServerSocketChannel、DatagramChannel、FileChannel等。
Buffer(缓冲区):Buffer是一块连续的内存区域,一般作为Channel收发数据的载体出现。所有数据都通过Buffer对象来处理。用户永远不会将字节直接写入通道中,相反,需将数据写入包含一个或者多个字节的缓冲区;同样,也不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。
Selector(选择器):Selector类提供了监控一个或多个通道当前状态的机制。只要Channel向Selector注册了某种特定事件,Selector就会监听这些事件是否会发生,一旦发生某个事件,便会通知对应的Channel。使用选择器,借助单一线程,就可对数量庞大的活动I/O通道实施监控和维护,如下图所示:
2.3.2 常用类
1)Buffer相关类
所有缓冲区包含以下3个属性:
capacity:缓冲区的末位值。它表明了缓冲区最多可以保存多少数据;
limit:表示缓冲区的当前存放数据的终点。不能对超过limit的区域进行读写数据;
position:下一个读写单元的位置。每次读写缓冲区时,均会修改该值,为下一次读写数据做准备。
这三个属性的大小关系是capacity≥limit≥position≥0
Buffer有两种不同的工作模式——写模式和读模式。在写模式下,limit与capacity相同,position随着写入数据增加,逐渐增加到limit,因此,0到position之间的数据即为已经写入的数据;在读模式下,limit初始指向position所在位置,position随着数据的读取,逐渐增加到limit,则0到position之间的数据即为已经读取的数据。
2)Channel相关类
java. nio提供了多种Channel实现,其中,最常用的是以SelectableChannel为基类的通道。SelectableChannel是一种支持阻塞I/O和非阻塞I/O的通道,它的主要方法如下:
- SelectableChannel configureBlocking(boolean block)throws IOException。
- 作用:设置当前SelectableChannel的阻塞模式。
- 参数含义:block表示是否将SelectableChannel设置为阻塞模式。
- 返回值:SelectableChannel对象本身的引用,相当于“return this”。
- SelectionKey register(Selector sel, int ops)throws ClosedChannelException。
- 作用:将当前Channel注册到一个Selector中。
- 参数含义:sel表示要注册的Selector;ops表示注册事件。
- 返回值:与注册Channel关联的SelectionKey对象,用于跟踪被注册事件。
SelectableChannel的两个子类是ServerSocketChannel和SocketChannel,它们分别是ServerSocket和Socket的替代类。
ServerSocketChannel主要用于监听TCP连接,
SocketChannel可看作Socket的替代类,但功能比Socket更加强大。同ServerSocket-Channel类似,它提供了静态工厂方法open()(创建对象)和socket()方法(返回与SocketChannel关联的Socket对象)。
3)Selector类
Selector可监听ServerSocketChannel和SocketChannel注册的特定事件,一旦某个事件发生,则会通知对应的Channel。SelectableChannel的register()方法负责注册事件,该方法返回一个SelectionKey对象,该对象即为用于跟踪这些注册事件的句柄。
Selector中常用的方法如下。
- static Selector open():一个静态工厂方法,可用于创建Selector对象。
- int select(long timeout):该方法等待并返回发生的事件。一旦某个注册的事件发生,就会返回对应的SelectionKey的数目,否则,一直处于阻塞状态,直到以下四种情况之一发生:
- 至少一个事件发生;
- 其他线程调用了Selector的wakeup()方法;
- 当前执行select()方法的线程被中断;
- 超出等待时间timeout,如果不设置等待时间,则表示永远不会超时。
- set selectedKeys():Selector捕获的已经发生事件对应的SelectionKey集合。
- Selector wakeup():立刻唤醒当前处于阻塞状态的Selector。常见应用场景是,线程A调用Selector对象的select()方法,阻塞等待某个注册事件发生,线程B通过调用wakeup()函数可立刻唤醒线程A,使其从select()方法中返回。
4)SelectionKey类
ServerSocketChannel或SocketChannel通过register()方法向Selector注册事件时,register()方法会创建一个SelectionKey对象,用于跟踪注册事件。在SelectionKey中定义了4种事件,分别用以下4个整型常量表示:
- [ ] SelectionKey. OP_ACCEPT:接收(accept)连接就绪事件,表示服务器端接
收到了客户端连接。 - [ ] SelectionKey. OP_CONNECT:连接就绪事件,表示客户端与服务器端的连接
已经建立成功。 - [ ] SelectionKey. OP_READ:读就绪事件,表示通道中已经有了可读数据,可执行
读操作了。 - [ ] SelectionKey. OP_WRITE:写就绪事件,表示可向通道中写入数据了。
通常而言,ServerSocketChannel对象向Selector中注册SelectionKey.OP_ACCEPT事件,而SocketChannel对象向Selector中注册SelectionKey.OP_CONNECT、SelectionKey.OP_READ和SelectionKey.OP_WRITE三种事件。
3.Hadoop RPC基本框架分析
3.1 RPC基本概念
RPC是一种通过网络从远程计算机上请求服务,但不需要了解底层网络技术的协议。RPC协议假定某些传输协议已经存在,如TCP或UDP等,并通过这些传输协议为通信程序之间传递访问请求或者应答信息。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发分布式应用程序更加容易。
3.1.1 RPC组成部分
RPC通常采用客户机/服务器模型。请求程序是一个客户机,而服务提供程序则是一个服务器。一个典型的RPC框架主要包括以下几个部分:
通信模块:两个相互协作的通信模块实现请求-应答协议。它们在客户机和服务器之间传递请求和应答消息,一般不会对数据包进行任何处理。
请求-应答协议的实现方式有两种,分别是同步方式和异步方式。如下图同步模式下客户端程序一直阻塞到服务器端发送的应答请求到达本地;而异步模式则不同,客户端将请求发送到服务器端后,不必等待应答返回,可以做其他事情,待服务器端处理完请求后,主动通知客户端。在高并发应用场景中,一般采用异步模式以降低访问延迟和提高带宽利用率。
Stub程序:客户端和服务器端均包含Stub程序,可将之看作代理程序。它使得远程函数调用表现的跟本地调用一样,对用户程序完全透明。在客户端,它表现的就像一个本地程序,但不直接执行本地调用,而是将请求信息通过网络模块发送给服务器端。此外,当服务器端发送应答后,它会解码对应结果。在服务器端,Stub程序依次进行以下处理:解码请求消息中的参数、调用相应的服务过程和编码应答结果的返回值。
调度程序:调度程序接收来自通信模块的请求消息,并根据其中的标识选择一个Stub程序处理。通常客户端并发请求量比较大时,会采用线程池提高处理效率。
客户程序/服务过程:请求的发出者和请求的处理者。如果是单机环境,客户程序可直接通过函数调用访问服务过程,但在分布式环境下,需要考虑网络通信,这不得不增加通信模块和Stub程序(保证函数调用的透明性)。
3.1.2 RPC工作流程
通常而言,一个RPC请求从发送到获取处理结果,所经历的步骤如下:
步骤1 客户程序以本地方式调用系统产生的Stub程序;
步骤2 该Stub程序将函数调用信息按照网络通信模块的要求封装成消息包,并交给通信模块发送到远程服务器端;
步骤3 远程服务器端接收此消息后,将此消息发送给相应的Stub程序;
步骤4 Stub程序拆封消息,形成被调过程要求的形式,并调用对应的函数;
步骤5 被调用函数按照所获参数执行,并将结果返回给Stub程序;
步骤6 Stub程序将此结果封装成消息,通过网络通信模块逐级地传送给客户程序。
3.2 Hadoop RPC基本框架
在正式介绍Hadoop RPC基本框架之前,先介绍怎么样使用它。Hadoop RPC主要对外提供了两种接口。正所谓知其然,然后知其所以然。
- public static VersionedProtocol getProxy/waitForProxy():构造一个客户端代理对象(该对象实现了某个协议),用于向服务器端发送RPC请求。
- public static Server getServer():为某个协议(实际上是Java接口)实例构造一个服务器对象,用于处理客户端发送的请求。
3.2.1 构建一个简单的Hadoop RPC
通常而言,Hadoop RPC使用方法可分为以下几个步骤。
步骤1 定义RPC协议。RPC协议是客户端和服务器端之间的通信接口,它定义了服务器端对外提供的服务接口。
步骤2 实现RPC协议。Hadoop RPC协议通常是一个Java接口,用户需要实现
该接口。
步骤3 构造并启动RPC Server。
步骤4 构造RPC Client,并发送RPC请求。
这四步没有实操总觉得比较遥远,那我们就动手编码试一下。
// 1. 定义RPC协议
interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol{
//版本号。默认情况下,不同版本号的RPC Client和Server之间不能相互通信
public static final long versionID=1L;
String echo(String value)throws IOException;
int add(int v1,int v2)throws IOException;
}
// 2.实现RPC协议
public static class ClientProtocolImpl implements ClientProtocol{
public long getProtocolVersion(String protocol, long clientVersion){
return ClientProtocol.versionID;
}
public String echo(String value)throws IOException{
return value;
}
public int add(int v1,int v2)throws IOException{
return v1+v2;
}
}
// 3.构造并启动RPC Server 新建一个类,主方法如下
public static void main(String[] args){
server=RPC.getServer(new ClientProtocolImpl(),serverHost, serverPort,
numHandlers, false, conf);
server.start();
}
// 4.构造RPC Client ,构建客户端类,方法如下
public static void main(String[] args){
proxy=(ClientProtocol)RPC.getProxy(
ClientProtocol.class, ClientProtocol.versionID, addr, conf);
int result=proxy.add(5,6);
String echoResult=proxy.echo("result");
}
3.2.2 Hadoop RPC 组成类分析
Hadoop RPC主要由三个大类组成,分别是RPC、Client和Server,分别对应对外编程接口、客户端实现和服务器端实现。
3.2.2.1 RPC类分析
RPC类实际上是对底层客户机/服务器网络模型的封装,以便为程序员提供一套更方便简洁的编程接口。
RPC类自定义了一个内部类RPC.Server。它继承Server抽象类,并利用Java反射机制实现了call接口(Server抽象类中并未给出该接口的实现),即根据客户端请求中的调用方法名称和对应参数完成方法调用。RPC类包含一个ClientCache类型的成员变量,它根据用户提供的SocketFactory缓存Client对象,以达到重用Client对象的目的。
3.2.2.2 Client类分析
Client主要完成的功能是发送远程过程调用信息并接收执行结果。Client类对外提供了两种接口,一种用于执行单个远程调用。另外一种用于执行批量远程调用。
Client内部有两个重要的内部类,分别是Call和Connection:
Call类:该类封装了一个RPC请求,它包含五个成员变量,分别是唯一标识id、函数调用信息param、函数执行返回值value、出错或者异常信息error和执行完成标识符done。由于Hadoop RPC Server采用了异步方式处理客户端请求,这使得远程过程调用的发生顺序与结果返回顺序无直接关系,而Client端正是通过id识别不同的函数调用。当客户端向服务器端发送请求时,只需填充id和param两个变量,而剩下的三个变量:value, error和done,则由服务器端根据函数执行情况填充。
Connection类:Client与每个Server之间维护一个通信连接。该连接相关的基本信息及操作被封装到Connection类中。其中,基本信息主要包括:通信连接唯一标识(remoteId),与Server端通信的Socket(socket),网络输入数据流(in),网络输出数据流(out),保存RPC请求的哈希表(calls)等
当调用call函数执行某个远程方法时,Client端需要进行如下几个步骤:
步骤1 创建一个Connection对象,并将远程方法调用信息封装成Call对象,放到Connection对象中的哈希表calls中;
步骤2 调用Connetion类中的sendParam()方法将当前Call对象发送给Server端;
步骤3 Server端处理完RPC请求后,将结果通过网络返回给Client端,Client端通过receiveResponse()函数获取结果;
步骤4 Client端检查结果处理状态(成功还是失败),并将对应的Call对象从哈希表中删除。
3.2.2.3 Server类分析
Hadoop采用了Master/Slave结构。其中,Master是整个系统的单点,如NameNode或JobTracker,这是制约系统性能和可扩展性的最关键因素之一,而Master通过ipc.Server接收并处理所有Slave发送的请求,这就要求ipc.Server将高并发和可扩展性作为设计目标。为此,ipc.Server采用了很多具有提高并发处理能力的技术,主要包括线程池、事件驱动和Reactor设计模式等。这些技术均采用了JDK自带的库实现。
Reactor是并发编程中的一种基于事件驱动的设计模式。它具有以下两个特点:
①通过派发/分离I/O操作事件提高系统的并发性能;
②提供了粗粒度的并发控制,使用单线程实现,避免了复杂的同步处理。
一个典型的Reactor模式中主要包括以下几个角色。
- [ ] Reactor:IO事件的派发者。
- [ ] Acceptor:接受来自Client的连接,建立与Client对应的Handler,并向Reactor注册此Handler。
- [ ] Handler:与一个Client通信的实体,并按一定的过程实现业务的处理。Handler内部往往会有更进一步的层次划分,用来抽象诸如read, decode, compute,encode和send等的过程。在Reactor模式中,业务逻辑被分散的IO事件所打破,所以Handler需要有适当的机制在所需的信息还不全(读到一半)的时候保存上下文,并在下一次IO事件到来的时候(另一半可读了)能继续上次中断的处理。
- [ ] Reader/Sender:为了加速处理速度,Reactor模式往往构建一个存放数据处理线程的线程池,这样,数据读出后,立即扔到线程池中等待后续处理即可。为此,Reactor模式一般分离Handler中的读和写两个过程,分别注册成单独的读事件和写事件,并由对应的Reader和Sender线程处理。
Server的主要功能是接收来自客户端的RPC请求,经过调用相应的函数获取结果后,返回给对应的客户端。为此,ipc.Server被划分成三个阶段:接收请求,处理请求和返回结果。各阶段实现细节如下:
1)接收请求
该阶段的主要任务是接收来自各个客户端的RPC请求,并将它们封装成固定的格式(Call类)放到一个共享队列(callQueue)中,以便进行后续处理。该阶段内部又分为两个子阶段:建立连接和接收请求,分别由两种线程完成:Listener和Reader。
整个Server只有一个Listener线程,统一负责监听来自客户端的连接请求。一旦有新的请求到达,它会采用轮询的方式从线程池中选择一个Reader线程进行处理。而Reader线程可同时存在多个,它们分别负责接收一部分客户端连接的RPC请求。至于每个Reader线程负责哪些客户端连接,完全由Listener决定。当前Listener只是采用了简单的轮询分配机制。
Listener和Reader线程内部各自包含一个Selector对象,分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。对于Listener线程,主循环的实现体是监听是否有新的连接请求到达,并采用轮询策略选择一个Reader线程处理新连接;对于Reader线程,主循环的实现体是监听(它负责的那部分)客户端连接中是否有新的RPC请求到达,并将新的RPC请求封装成Call对象,放到共享队列callQueue中。
2)处理请求
该阶段的主要任务是从共享队列callQueue中获取Call对象,执行对应的函数调用,并将结果返回给客户端,这全部由Handler线程完成。Server端可同时存在多个Handler线程。它们并行从共享队列中读取Call对象,经执行对应的函数调用后,将尝试着直接将结果返回给对应的客户端。但考虑到某些函数调用返回的结果很大或者网络速度过慢,可能难以将结果一次性发送到客户端,此时Handler将尝试着将后续发送任务交给Responder线程。
3)返回结果
每个Handler线程执行完函数调用后,会尝试着将执行结果返回给客户端,但对于特殊情况,比如函数调用返回的结果过大或者网络异常情况(网速过慢),会将发送任务交给Responder线程。
Server端仅存在一个Responder线程。它的内部包含一个Selector对象,用于监听SelectionKey.OP_WRITE事件。当Handler没能够将结果一次性发送到客户端时,会向该Selector对象注册SelectionKey.OP_WRITE事件,进而由Responder线程采用异步方式继续发送未发送完成的结果。
4. Hadoop RPC的发展与展望
当前存在非常多的开源RPC框架,比较有名的有Thrift, Protocol Buffers和Avro。与Hadoop RPC一样,它们均由两部分组成:对象序列化和远程过程调用。相比于Hadoop RPC,它们有以下几个特点。
- [ ] 跨语言特性:前面提到,RPC框架实际上是客户机/服务器模型的一个应用实例。对于Hadoop RPC而言,由于Hadoop采用Java语言编写,因而其RPC客户端和服务器端仅支持Java语言;但对于更通用的RPC框架,如Thrift或者Protocol Buffers等,其客户端和服务器端可采用任何语言编写,如Java, C++,Python等,这给用户编程带来极大的方便。
- [ ] 引入IDL:开源RPC框架均提供了一套接口描述语言(Interface DescriptionLanguage,IDL)。它提供一套通用的数据类型,并以这些数据类型来定义更为复杂的数据类型和对外服务接口。一旦用户按照IDL定义的语法编写完接口文件后,即可根据实际应用需要生成特定的编程语言(如Java, C++,Python等)的客户端和服务器端代码。
- [ ] 协议兼容性:开源RPC框架在设计上均考虑到了协议兼容性问题,即当协议格式发生改变时,比如某个类需要添加或者删除一个成员变量(字段)后,旧版本代码仍然能识别新格式的数据,也就是说,具有向后兼容性。
随着Hadoop版本的不断演化,Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足,具体表现为:
1)从长远发展看,Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现,比如用户希望直接使用C/C++语言读写HDFS中的文件,这就需要有C/C++语言的HDFS客户端。
2)当前Hadoop版本较多,而不同版本之间不能通信。
从0.21.0版本开始,Hadoop尝试着将RPC中的序列化部分剥离开,以便将现有的开源RPC框架集成进来。RPC类变成了一个工厂,它将具体的RPC实现授权给RpcEngine实现类,而现有的开源RPC只要实现RpcEngine接口,便可以集成到Hadoop RPC中。
正如当前的YARN使用的事件处理的方式,能够大大增强并发性,从而提高系统整体性能。
以及Yarn的RPC通讯方式:
YARN中的序列化框架采用了Google开源的Protocol Buffers。Protocol Buffers的引入使得YARN在兼容性方面向前迈进了一大步。
总结
Hadoop RPC是Hadoop多个子系统公用的网络通信模块。其性能和可扩展性直接影响其上层系统的性能和可扩展性,因此扮演着极其重要的角色。
Hadoop RPC分为两层:上层是直接供外面使用的公共RPC接口;下层是一个客户机/服务器模型,该模型在实现过程中用到了Java自带的多个工具包,包括java.lang.reflect(反射机制和动态代理相关类)、java.net(网络编程库)和java.nio(NIO)等。
Hadoop RPC主要由三个大类组成,分别是RPC、Client和Server,分别对应对外编程接口、客户端实现和服务器端实现。其中,Server具有高性能和良好的可扩展性等特点,在具体实现时采用了线程池、事件驱动和Reactor设计模式等机制。
Hadoop MapReduce基于RPC框架实现了6个通信协议,分别是JobSubmissionsProtocol, RefreshUserMappingsProtocol,RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,InterTrackerProtocol和TaskUmbilicalProtocol。这些协议像是系统的“骨架”,支撑起整个MapReduce系统。随着Hadoop的不断演化,更多开源的RPC框架不断和现有RPC机制进行整合,更好的提升Hadoop的并发和处理能力。
好了,今天的文章到这里就结束了,希望对小可爱们有所帮助。
路漫漫其修远兮,吾将上下而求索。让我们一起在不断学习的道路上渐行渐远渐无书。