本文主要对HDFSRPC通信框架解析。包括listener,reader,handler,responser等实现类的源码分析。注意hadoop版本为3.1.1。
写在前面
rpc肯定依赖于socket通信,并且使用的是java NIO。读者最好对nio有一定的了解,文章中不会对相关知识作过多的介绍。
https://blog.csdn.net/yhl_jxy/article/details/79332092
还有本文中涉及到的代码大部分都是作者都整理过的,会和server源码有些许区别。
RPC框架架构图
从架构图中可以看出一个socket连接的数据处理被多个模块分割,每个模块处理特定的问题。这样做的好处一方面保证了call的并发,另一方面也保证了代码的可扩展性。
Listener
listener就是监听线程,那到底是监听什么?显而易见是socket连接又称connection。
Listener.run、doAccpect
public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
Server.connectionManager.startIdleScan();
while (Server.running) {
SelectionKey key = null;
try {
getSelector().select();
Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
LOG.warn("Out of Memory in server select", e);
closeCurrentConnection(key, e);
Server.connectionManager.closeIdle(true);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
}
LOG.info("Stopping " + Thread.currentThread().getName());
synchronized (this) {
try {
acceptChannel.close();
selector.close();
} catch (IOException e) { }
selector= null;
acceptChannel= null;
// close all connections
Server.connectionManager.stopIdleScan();
Server.connectionManager.closeAll();
}
}
void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true);
Reader reader = getReader();
Connection c = Server.connectionManager.register(channel);
// If the connectionManager can't take it, close the connection.
if (c == null) {
if (channel.isOpen()) {
IOUtils.cleanup(null, channel);
}
Server.connectionManager.droppedConnections.getAndIncrement();
continue;
}
key.attach(c); // so closeCurrentConnection can get the object
reader.addConnection(c);
}
}
简单来说就是accept channel,变成connection,然后交给reader处理。
Reader
Reader在整个RPC框架中起着举足轻重的作用。在HDFSRPC协议详解一文中processOneRpc之前的工作都是reader完成的。总结一下就是以下几点:
- rpc connection初始7字节的检查。
- sasl握手与验证。
- IpcConnectionContext读取。
- processOneRpc准备工作,包括RequestHeaderProto解析。
还有一点要注意的一次reader就包含完成这所有工作,而不是多次完成。单次reader生成call以后,就会马上下次call的read,本质上call是并发的,由handler处理。
reader的源码其实很简单,本质上是循环执行了connection.readAndProcess()。本文不会对readAndProcess过多介绍,有兴趣可以查看HDFSRPC协议详解。
@Override
public void run() {
LOG.info("Starting " + Thread.currentThread().getName());
try {
doRunLoop();
} finally {
try {
readSelector.close();
} catch (IOException ioe) {
LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);
}
}
}
private synchronized void doRunLoop() {
while (Server.running) {
SelectionKey key = null;
try {
// consume as many connections as currently queued to avoid
// unbridled acceptance of connections that starves the select
int size = pendingConnections.size();
for (int i=size; i>0; i--) {
Connection conn = pendingConnections.take();
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
}
readSelector.select();
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isReadable()) {
doRead(key);
}
} catch (CancelledKeyException cke) {
// something else closed the connection, ex. responder or
// the listener doing an idle scan. ignore it and let them
// clean up.
LOG.info(Thread.currentThread().getName() +
": connection aborted from " + key.attachment());
}
key = null;
}
} catch (InterruptedException e) {
if (Server.running) { // unexpected -- log it
LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
}
} catch (IOException ex) {
LOG.error("Error in Reader", ex);
} catch (Throwable re) {
LOG.error("Bug in read selector!", re);
//ExitUtil.terminate(1, "Bug in read selector!");
}
}
}
//from Listener doRead
void doRead(SelectionKey key) throws InterruptedException {
int count;
Connection c = (Connection)key.attachment();
if (c == null) {
return;
}
c.setLastContact(Time.now());
try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
// Any exceptions that reach here are fatal unexpected internal errors
// that could not be sent to the client.
LOG.info(Thread.currentThread().getName() +
": readAndProcess from client " + c +
" threw exception [" + e + "]", e);
count = -1; //so that the (count < 0) block is executed
}
// setupResponse will signal the connection should be closed when a
// fatal response is sent.
if (count < 0 || c.shouldClose()) {
Server.closeConnection(c);
c = null;
}
else {
c.setLastContact(Time.now());
}
}
CallQueue
callQueue主要是存放call队列,由于callqueue在hdfs是一个较为复杂的东西,后期会单做一期介绍。
Handler
handler线程也比较简单,实际上就是执行了call.run()。
@Override
public void run() {
LOG.debug(Thread.currentThread().getName() + ": starting");
while (Server.running) {
try {
final Call call = Server.callQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": " + call);
}
CurCall.set(call);
/*TODO
UserGroupInformation remoteUser = call.getRemoteUser();
if (remoteUser != null) {
remoteUser.doAs(call);
} else {
call.run();
}*/
call.run();
} catch (InterruptedException e) {
if (Server.running) { // unexpected -- log it
LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
}
} catch (Exception e) {
LOG.info(Thread.currentThread().getName() + " caught an exception", e);
} finally {
CurCall.set(null);
}
}
LOG.debug(Thread.currentThread().getName() + ": exiting");
}
主要的难点是这么执行call.run()。要知道call.run首先要知道protocols。
Protocols
每个server都自己的Protocols,protocols首先是以rpcKind分类的。
enum RpcKindProto {
RPC_BUILTIN = 0; // Used for built in calls by tests
RPC_WRITABLE = 1; // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine
}
3.x的rpckind都使用的是RPC_PROTOCOL_BUFFER,所以以这个为例。
RPC_PROTOCOL_BUFFER的protocols会放到一个hashmap里面。
Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMapArray = new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
key为ProtoNameVer,要注意的hashcode的实现方法。
static class ProtoNameVer {
final String protocol;
final long version;
ProtoNameVer(String protocol, long ver) {
this.protocol = protocol;
this.version = ver;
}
@Override
public boolean equals(Object o) {
if (o == null)
return false;
if (this == o)
return true;
if (! (o instanceof ProtoNameVer))
return false;
ProtoNameVer pv = (ProtoNameVer) o;
return ((pv.protocol.equals(this.protocol)) &&
(pv.version == this.version));
}
@Override
public int hashCode() {
return protocol.hashCode() * 37 + (int) version;
}
}
所以任何protocol必须有protocol和version,即注解类ProtocolInfo。
@Retention(RetentionPolicy.RUNTIME)
public @interface ProtocolInfo {
String protocolName(); // the name of the protocol (i.e. rpc service)
long protocolVersion() default -1; // default means not defined use old way
}
一个protocol的接口类类似这样。
@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME, protocolVersion = 1)
/**
* Protocol that a clients use to communicate with the NameNode.
*
* Note: This extends the protocolbuffer service based interface to
* add annotations required for security.
*/
public interface ClientNamenodeProtocolPB extends ClientNamenodeProtocol.BlockingInterface {
}
那反射的方法怎么来呢?我们可以发现ClientNamenodeProtocol.BlockingInterface其实是protobuf编译出来的,可以看一下ClientNamenodeProtocol.proto文件的最后service定义。
service ClientNamenodeProtocol {
rpc getBlockLocations(GetBlockLocationsRequestProto)
returns(GetBlockLocationsResponseProto);
rpc getServerDefaults(GetServerDefaultsRequestProto)
returns(GetServerDefaultsResponseProto);
rpc create(CreateRequestProto)returns(CreateResponseProto);
rpc append(AppendRequestProto) returns(AppendResponseProto);
rpc setReplication(SetReplicationRequestProto)
returns(SetReplicationResponseProto);
rpc setStoragePolicy(SetStoragePolicyRequestProto)
...
}
编译出来就是ClientNamenodeProtocol.BlockingInterface,里面就是方法列表。
我们自己的实现类只需要实现ClientNamenodeProtocolPB即可。例如ClientNamenodeProtocolServerSideTranslatorPB。
//add protocols
ClientNamenodeProtocolServerSideTranslatorPB cnn = new ClientNamenodeProtocolServerSideTranslatorPB();
BlockingService cnnService = ClientNamenodeProtocol.newReflectiveBlockingService(cnn);
Server.addProtocol(ClientNamenodeProtocolPB.class, cnnService);
最后call.run其实是根据RequestHeaderProto来找到对应的实现类。
message RequestHeaderProto {
/** Name of the RPC method */
required string methodName = 1;
/**
* RPCs for a particular interface (ie protocol) are done using a
* IPC connection that is setup using rpcProxy.
* The rpcProxy's has a declared protocol name that is
* sent form client to server at connection time.
*
* Each Rpc call also sends a protocol name
* (called declaringClassprotocolName). This name is usually the same
* as the connection protocol name except in some cases.
* For example metaProtocols such ProtocolInfoProto which get metainfo
* about the protocol reuse the connection but need to indicate that
* the actual protocol is different (i.e. the protocol is
* ProtocolInfoProto) since they reuse the connection; in this case
* the declaringClassProtocolName field is set to the ProtocolInfoProto
*/
required string declaringClassProtocolName = 2;
/** protocol version of class declaring the called method */
required uint64 clientProtocolVersion = 3;
}
然后通过反射,去执行了实现类的方法。
Writable call(String protocol, Writable writableRequest, long receiveTime) throws Exception {
RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;
RequestHeaderProto rpcRequest = request.getRequestHeader();
String methodName = rpcRequest.getMethodName();
/**
* RPCs for a particular interface (ie protocol) are done using a
* IPC connection that is setup using rpcProxy.
* The rpcProxy's has a declared protocol name that is
* sent form client to server at connection time.
*
* Each Rpc call also sends a protocol name
* (called declaringClassprotocolName). This name is usually the same
* as the connection protocol name except in some cases.
* For example metaProtocols such ProtocolInfoProto which get info
* about the protocol reuse the connection but need to indicate that
* the actual protocol is different (i.e. the protocol is
* ProtocolInfoProto) since they reuse the connection; in this case
* the declaringClassProtocolName field is set to the ProtocolInfoProto.
*/
String declaringClassProtoName =
rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();
//LOG.info("Call: connectionProtocolName=" + connectionProtocolName + ", method=" + methodName + ", declaringClass=" + declaringClassProtoName);
ProtoClassProtoImpl protocolImpl = getProtocolImpl(declaringClassProtoName, clientVersion);
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
if (methodDescriptor == null) {
String msg = "Unknown method " + methodName + " called on " + protocol + " protocol.";
LOG.warn(msg);
throw new RpcNoSuchMethodException(msg);
}
Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = request.getValue(prototype);
Message result = null;
long startTime = Time.now();
int qTime = (int) (startTime - receiveTime);
Exception exception = null;
boolean isDeferred = false;
try {
//server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
result = service.callBlockingMethod(methodDescriptor, null, param);
// Check if this needs to be a deferred response,
// by checking the ThreadLocal callback being set
} catch (ServiceException e) {
exception = (Exception) e.getCause();
throw (Exception) e.getCause();
} catch (Exception e) {
exception = e;
throw e;
} finally {
int processingTime = (int) (Time.now() - startTime);
//if (LOG.isDebugEnabled()) {
String msg =
"Served: " + methodName + (isDeferred ? ", deferred" : "") +
", queueTime= " + qTime +
" procesingTime= " + processingTime;
if (exception != null) {
msg += " exception= " + exception.getClass().getSimpleName();
}
//LOG.debug(msg);
LOG.info(msg);
//LOG.info("params:" + param.toString());
//LOG.info("result:" + result.toString());
//}
String detailedMetricsName = (exception == null) ?
methodName :
exception.getClass().getSimpleName();
//server.updateMetrics(detailedMetricsName, qTime, processingTime, isDeferred);
}
return RpcWritable.wrap(result);
}
完成以后如果有返回Message会放入rpccall.rpcResponse。然后再把call放入ResponseQueue。
ResponseQueue
在connection中,主要存放处理完的call。
Responder
Responder线程主要负责call结果的返回。
private boolean processResponse(LinkedList<RpcCall> responseQueue,
boolean inHandler) throws IOException {
boolean error = true;
boolean done = false; // there is more data for this channel.
int numElements = 0;
RpcCall call = null;
try {
synchronized (responseQueue) {
//
// If there are no items for this channel, then we are done
//
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
//
// Extract the first call
//
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call);
}
//
// Send as much data as we can in the non-blocking fashion
//
int numBytes = call.connection.channelWrite(channel, call.rpcResponse);
if (numBytes < 0) {
return true;
}
if (!call.rpcResponse.hasRemaining()) {
//Clear out the response buffer so it can be collected
call.rpcResponse = null;
call.connection.decRpcCount();
if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel.
} else {
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote " + numBytes + " bytes.");
}
} else {
//
// If we were unable to write the entire response out, then
// insert in Selector queue.
//
call.connection.responseQueue.addFirst(call);
if (inHandler) {
// set the serve time when the response has to be sent later
call.timestamp = Time.now();
incPending();
try {
// Wakeup the thread blocked on select, only then can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
decPending();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote partial " + numBytes + " bytes.");
}
}
error = false; // everything went off well
}
} finally {
if (error && call != null) {
LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");
done = true; // error. no more data for this channel.
Server.closeConnection(call.connection);
}
}
return done;
}