Hadoop RPC机制详解
-
通信模块。两个相互协作的通信模块实现请求 – 应答协议,它们在客户和服务器之间传递请求和应答消息,一般不会对数据包进行任何处理。请求 – 应答协议的实现一般有同步方式和异步方式两种
-
同步模式下客户端程序一直阻塞到服务器断发送的应答请求到达本地
-
异步模式下将请求发送到服务端后,不必等待应答返回,可以做其他事情
-
Stub程序。客户端和服务器端均包含Stub程序,可以将之看作代理程序。它使得远程函数调用表现的跟本地调用一样,对用户程序完全透明。在客户端,Stub程序像一个本地程序,但不直接执行本地调用,而是将请求信息提供网络模块发送给服务器端,服务器端给客户端发送应答后,客户端Stub程序会解码对应结果。在服务器端,Stub程序依次进行解码请求消息中的参数、调用相应的服务过程和编码应答结果的返回值等处理
-
调度程序。调度程序接收来自通信模块的请求信息,并根据其中的标识选择一个Stub程序进行处理。通常客户端并发请求量比较大时,会采用线程池提高处理效率
-
客户程序/服务过程。请求的发出者和请求的处理者
- 客户程序以本地方式调用系统产生的Stub程序
- 该Stub程序将函数调用信息按照网络通信模块的要求封装成消息包,并交给通信模块发送给远程服务器端
- 远程服务器端接收此消息后,将此消息发送给相应的Stub程序
- Stub程序拆封消息,形成被调过程要求的形式,并调用对应函数
- 被调用函数按照所获参数执行,并将结果返回给Stub程序
- Stub程序将此结果封装成消息,通过网络通信模块逐级地传送给客户程序
- 透明性。这是所有RPC框架最根本的特点,即当用户在一台计算机的程序调用另外一台计算机上的子程序时,用户自身不应感觉到其间设计机器间的通信,而是感觉像是在执行一个本地调用
- 高性能。Hadoop各个系统(HDFS,YARN,MapReduce等)均采用了Master/Slave架构,其中,Master实际上是一个RPC Server,它负责处理集群中所有Slave发送的服务请求,为了保证Master的并发处理能力,RPC Server应是一个高性能服务器,能够高效地处理来自多个Client的并发RPC请求
- 可控性。RPC是Hadoop最底层最核心的模块之一,保证其轻量级,高性能和可控性显得尤为重要
- 序列化层。序列化主要作用是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储,在RPC框架中,它主要是用于将用户请求中的参数或者应答转换成字节流以便跨机器传输
- 函数调用层。函数调用层主要功能是定位要调用的而函数并执行该函数,Hadoop RPC采用了Java反射机制与动态代理实现了函数调用
- 网络传输层。网络传输层描述了Client与Server之间消息传输的方式,Hadoop RPC采用了基础TCP/IP的Socket机制
- 服务器端处理框架。服务器端处理框架可被抽象为网络I/O模型,它描述了客户端与服务器间信息的交互方式,它的设计直接决定这服务器端的并发处理能力,而Hadoop RPC采用了基于Reactor设计模式的事件驱动I/O模型
Hadoop RPC总体架构
- public static <T>ProtocolProxy <T> getProxy/waitForProxy() : 构造一个客户端代理对象,用于向服务器发送RPC请求
- public static Server RPC.Builder (Configuration).build() : 为某个协议实例构造一个服务器对象,用于处理客户端发送的请求
interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol { public static final long versionID = 1L; String echo(String value) throws IOException; int add(int v1 , int v2) throws IOException; }
public static class ClientProtocolImpl implements ClientProtocol { //重载的方法,用于获取自定义的协议版本号 public long getProtocolVersion(String protocol, long clientVersion) { return ClientProtocol.versionID; } //重载的方法,用于获取协议签名 public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, inthashcode) { return new ProtocolSignature(ClientProtocol.versionID, null); } public String echo(String value) throws IOException { return value; } public int add(int v1, int v2) throws IOException { return v1 + v2; } }
Server server = new RPC.Builder(conf).setProtocol(ClientProtocol.class) .setInstance(new ClientProtocolImpl()).setBindAddress(ADDRESS).setPort(0) .setNumHandlers(5).build(); server.start(); // BindAddress : 服务器的HOST // Port : 监听端口号,0代表系统随机选择一个端口号 // NumHandlers : 服务端处理请求的线程数目
proxy = (ClientProtocol)RPC.getProxy( ClientProtocol.class, ClientProtocol.versionID, addr, conf); int result = proxy.add(5, 6); String echoResult = proxy.echo("result");
- Call类 : 封装了一个RPC请求,它包含5个成员变量,分别是唯一标识ID、函数调用信息param、函数执行返回值value、出错或者异常信息error和执行完成标识符done。由于Hadoop RPC Server采用异步方式处理客户端请求,这使远程过程调用的发生顺序与结果返回顺序无直接关系,而Client端正式提供ID识别不同的函数调用的。当客户端向服务器端发送请求时,只需填充id和param两个变量,而剩下的三个变量则由服务器根据函数执行情况填充
- Connection类 : Client与每个Server之间维护一个通信连接,与该连接相关的基本信息及操作被封装到Connection类中,基本信息主要包括通信连接唯一标识、与Server端通信的Socket、网络输入数据流(in)、网络输出数据流(out)、保存RPC请求的哈希表(calls)等。操作则包括 :
- addCall — 将一个Call对象添加到哈希表中
- sendParam — 向服务器端发送RPC请求
- receiveResponse — 从服务器端接收已经处理完成的RPC请求
- run — Connection是一个线程类,它的run方法调用了receiveResponse方法,会一直等待接收RPC返回结果
- 创建一个Connection对象,并将远程方法调用信息封装成Call对象,放到Connection对象中的哈希表中
- 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端
- Server端处理完RPC请求后,将结果通过网络返回给Client端,Client端通过receiveRpcResponse()函数获取结果
- Client检查结果处理状态,并将对应Call对象从哈希表中删除
- 通过派发/分离IO操作事件提高系统的并发性能
- 提供了粗粒度的并发控制,使用单线程实现,避免了复杂的同步处理
- Reactor : I/O事件的派发者
- Acceptor : 接受来自Client的连接,建立与Client对应的Handler,并向Reactor注册此Handler
- Handler : 与一个Client通信的实体,并按一定的过程实现业务的处理
- Reader/Sender : 为了加速处理速度,Reactor模式往往构建一个存放数据处理线程的线程池,这样数据读出后,立即扔到线程吃中等待后续处理即可。为此,Reactor模式一般分离Handler中的读和写两个过程,分别注册成单独的读事件和写事件,并由对应的Reader和Sender线程处理
- Reader线程数目。参数ipc.server.read.threadpool.size设置
- 每个Handler线程对应的最大Call数目。参数ipc.server.handler.queue.size设置
- Handler线程数目。参数yarn.resourcemanager.resource-tracker.client.thread-count和dfs.namenode.service.handler.count设置
- 客户端最大重试次数。参数ipc.client.connect.max.size设置