本文主要阐述HDFSRPC安全认证相关的实现。主要介绍Token相关的实现。
写在前面
相关blog
https://blog.csdn.net/hncscwc/article/details/124722784
https://blog.csdn.net/hncscwc/article/details/124958357
Token由来
在探究完Kerberos,我一直在想一个问题,rpcConnection已经完成了验证,那为何还需要token?首先需要对yarn有一定的了解,我们知道mapreduce框架是把目标变成多个map,然后reduce出结果。Yarn在执行多个map、reduce的时候,是通过container来运行的。Container本质上是一个独立程序,执行了yarn分配的任务。当Container进程要去访问hdfs的时候,如果使用Kerberos,kdc验证服务存在的不可靠和性能问题(多机多container并发极高)必然会极大的限制大数据平台的稳定,尤其是当有大量用户请求需要通过kdc来获取tgt票据时。因此Token认证被引入充当kerberos的补充,在兼顾安全认证的同时,性能没有较大的损耗。在hadoop中,token主要包括DelegationToken,以及其他一些token,例如之前文件介绍过的BlockToken,以及yarn中一系列的token。
Token中yarn container流程图
Token的应用
当完成kerberos验证以后,服务主体的可以通过getDelegationToken接口来获取token。当服务主体下面的的进程需要去访问hdfs的时候,可以通过token来访问。
Token的验证也在rpc的sasl中,但是步骤跟简单,如下:
server当收到client negotiate请求以后,会返回多个auth。
auths {
method: "TOKEN"
mechanism: "DIGEST-MD5"
protocol: ""
serverId: "default"
challenge: "realm=\"default\",nonce=\"svFDnzmhsk40oN5z6vnUFgYgawR17w+XvxiX1Z3M\",charset=utf-8,algorithm=md5-sess"
}
auths {
method: "KERBEROS"
mechanism: "GSSAPI"
protocol: "root"
serverId: "node17"
}
client接收完negotiate应答后,可以通过服务主体获取的token来initSaslClient,然后发送Initiate请求。Server接收到Initiate请求,会通过token初始化saslServer,不同于Kerberos,saslserver验证完token会立马complete。这时候server会直接返回success应答给客户端。客户端接收到success应答以后即完成SaslClient的初始化。
可以看出token验证的整个过程更简单,而且本质上就是server验证了一下client的token,消耗更少,性能更高。
token验证本身与用户密码生成没有任何关系,主要都是java原生类来实现。代码如下:
public class TokenTest {
public static final String SASL_DEFAULT_REALM = "default";
public static final String USERNAME = "tokentestuser";
public static final char[] PASSWORD = new char[]{'1'};
public static void main(String[] args) throws SaslException {
String mechanism = "DIGEST-MD5";
CallbackHandler serverCallback = new SaslDigestCallbackHandler();
String protocol = "";
String serverId = SASL_DEFAULT_REALM;
SaslServer saslServer = FastSaslServerFactory.getInstance().createSaslServer(mechanism, protocol, serverId, null, serverCallback);
String saslUser = null;
Map<String, String> saslProperties = new HashMap<String, String>();
saslProperties.put("javax.security.sasl.qop", "auth");
saslProperties.put("javax.security.sasl.server.authentication", "true");
CallbackHandler clientCallback = new SaslClientCallbackHandler();
SaslClient saslClient = Sasl.createSaslClient(new String[]{mechanism}, saslUser, protocol, serverId, saslProperties, clientCallback);
byte[] response = saslServer.evaluateResponse(new byte[0]);
System.out.println("NEGOTIATE:" + new String(response));
byte[] request = saslClient.evaluateChallenge(response);
System.out.println("INITIATE:" + new String(request));
byte[] response2 = saslServer.evaluateResponse(request);
System.out.println("SUCCESS:" + new String(response2));
System.out.println("server complete:" + saslServer.isComplete());
saslClient.evaluateChallenge(response2);
System.out.println("client complete:" + saslClient.isComplete());
}
public static class SaslDigestCallbackHandler implements CallbackHandler {
@Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
AuthorizeCallback ac = null;
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
ac = (AuthorizeCallback) callback;
} else if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof RealmCallback) {
continue; // realm is ignored
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL DIGEST-MD5 Callback");
}
}
if (pc != null) {
pc.setPassword(PASSWORD);
}
if (ac != null) {
String authid = ac.getAuthenticationID();
String authzid = ac.getAuthorizationID();
if (authid.equals(authzid)) {
ac.setAuthorized(true);
} else {
ac.setAuthorized(false);
}
if (ac.isAuthorized()) {
ac.setAuthorizedID(authzid);
}
}
}
}
private static class SaslClientCallbackHandler implements CallbackHandler {
private final String userName;
private final char[] userPassword;
public SaslClientCallbackHandler() {
this.userName = USERNAME;
this.userPassword = PASSWORD;
}
@Override
public void handle(Callback[] callbacks)
throws UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
RealmCallback rc = null;
for (Callback callback : callbacks) {
if (callback instanceof RealmChoiceCallback) {
continue;
} else if (callback instanceof NameCallback) {
nc = (NameCallback) callback;
} else if (callback instanceof PasswordCallback) {
pc = (PasswordCallback) callback;
} else if (callback instanceof RealmCallback) {
rc = (RealmCallback) callback;
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL client callback");
}
}
if (nc != null) {
nc.setName(userName);
}
if (pc != null) {
pc.setPassword(userPassword);
}
if (rc != null) {
rc.setText(rc.getDefaultText());
}
}
}
}
程序运行输出:
NEGOTIATE:realm="default",nonce="alYJcFcQ1r8azJmG4E+9Vy4HJt7AfNyJIXhGCvcD",charset=utf-8,algorithm=md5-sess
INITIATE:charset=utf-8,username="tokentestuser",realm="default",nonce="alYJcFcQ1r8azJmG4E+9Vy4HJt7AfNyJIXhGCvcD",nc=00000001,cnonce="nA2o8sejSYExOtEt8ELnWJXob3KDHOIF2OlaxozQ",digest-uri="/default",maxbuf=65536,response=e388c2b4a0f68f94607e01b033ef61b2,qop=auth
SUCCESS:rspauth=af3865533148b4f6539b785ce2958854
server complete:true
client complete:true
Client当收到server的negotiate response后,会通过某个算法生成response,然后发送initiate request。Server会通过同样的算法来生成自己的验证值来比较response,成功以后会同样的算法生成rspauth。Client收到rspauth以后,会用同样的算法来成自己的验证值来比较rspauth。
Token验证算法
protected byte[] generateResponseValue(
String authMethod,
String digestUriValue,
String qopValue,
String usernameValue,
String realmValue,
char[] passwdValue,
byte[] nonceValue,
byte[] cNonceValue,
int nonceCount,
byte[] authzidValue
) throws NoSuchAlgorithmException,
UnsupportedEncodingException,
IOException {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] hexA1, hexA2;
ByteArrayOutputStream A2, beginA1, A1, KD;
// A2
// --
// A2 = { "AUTHENTICATE:", digest-uri-value,
// [:00000000000000000000000000000000] } // if auth-int or auth-conf
//
A2 = new ByteArrayOutputStream();
A2.write((authMethod + ":" + digestUriValue).getBytes(encoding));
if (qopValue.equals("auth-conf") ||
qopValue.equals("auth-int")) {
logger.log(Level.FINE, "DIGEST04:QOP: {0}", qopValue);
A2.write(SECURITY_LAYER_MARKER.getBytes(encoding));
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "DIGEST05:A2: {0}", A2.toString());
}
md5.update(A2.toByteArray());
byte[] digest = md5.digest();
hexA2 = binaryToHex(digest);
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "DIGEST06:HEX(H(A2)): {0}", new String(hexA2));
}
// A1
// --
// H(user-name : realm-value : passwd)
//
beginA1 = new ByteArrayOutputStream();
beginA1.write(stringToByte_8859_1(usernameValue));
beginA1.write(':');
// if no realm, realm will be an empty string
beginA1.write(stringToByte_8859_1(realmValue));
beginA1.write(':');
beginA1.write(stringToByte_8859_1(new String(passwdValue)));
md5.update(beginA1.toByteArray());
digest = md5.digest();
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "DIGEST07:H({0}) = {1}",
new Object[]{beginA1.toString(), new String(binaryToHex(digest))});
}
// A1
// --
// A1 = { H ( {user-name : realm-value : passwd } ),
// : nonce-value, : cnonce-value : authzid-value
//
A1 = new ByteArrayOutputStream();
A1.write(digest);
A1.write(':');
A1.write(nonceValue);
A1.write(':');
A1.write(cNonceValue);
if (authzidValue != null) {
A1.write(':');
A1.write(authzidValue);
}
md5.update(A1.toByteArray());
digest = md5.digest();
H_A1 = digest; // Record H(A1). Use for integrity & privacy.
hexA1 = binaryToHex(digest);
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "DIGEST08:H(A1) = {0}", new String(hexA1));
}
//
// H(k, : , s);
//
KD = new ByteArrayOutputStream();
KD.write(hexA1);
KD.write(':');
KD.write(nonceValue);
KD.write(':');
KD.write(nonceCountToHex(nonceCount).getBytes(encoding));
KD.write(':');
KD.write(cNonceValue);
KD.write(':');
KD.write(qopValue.getBytes(encoding));
KD.write(':');
KD.write(hexA2);
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "DIGEST09:KD: {0}", KD.toString());
}
md5.update(KD.toByteArray());
digest = md5.digest();
byte[] answer = binaryToHex(digest);
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "DIGEST10:response-value: {0}",
new String(answer));
}
return (answer);
}
本质上是把用户名密码和一些参数,放入MD5.update之中,最终生成一个MD5值。
值得注意的是生成response和rspauth时只有第一个参数authMethod不一样,一个为
AUTHENTICATE,一个为空字符串。
digestResp.write(generateResponseValue("AUTHENTICATE",
digestUri, negotiatedQop, username,
negotiatedRealm, passwd, nonce, cnonce,
nonceCount, authzidBytes));
byte[] expected = generateResponseValue("",
digestUri, negotiatedQop, username, negotiatedRealm,
passwd, nonce, cnonce, nonceCount, authzidBytes);
Token的统一管理
Hadoop中Delegation Tokens的生成和验证主要依赖于HMAC机制。但是实际的实现可以自定义。主要原因是由于生成和验证都是在server端实现。Token相关的rpc接口如下:
Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException;
long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException;
void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException;
一般的密码生成实现是生成用户名密码,存入数据库,然后通过查表验证。Token的实现略有不同,由于是HMAC来生成密码,所以密码是实时生成的,但是要保存HMAC的key,类似于AES256算法的key,key也不是固定的,是会变化的,所以要记录key。所以Token的持久化主要是持久化key和Token,也是通过proto格式来存,在fsimage.proto中。
message SecretManagerSection {
message DelegationKey {
optional uint32 id = 1;
optional uint64 expiryDate = 2;
optional bytes key = 3;
}
message PersistToken {
optional uint32 version = 1;
optional string owner = 2;
optional string renewer = 3;
optional string realUser = 4;
optional uint64 issueDate = 5;
optional uint64 maxDate = 6;
optional uint32 sequenceNumber = 7;
optional uint32 masterKeyId = 8;
optional uint64 expiryDate = 9;
}
optional uint32 currentId = 1;
optional uint32 tokenSequenceNumber = 2;
optional uint32 numKeys = 3;
optional uint32 numTokens = 4;
// repeated DelegationKey keys
// repeated PersistToken tokens
}