从零开始写一个RPC框架——一切从这里开始

RPC,这个名词在我上学及找工作的的几年前是比较少能谈及到的,我也是工作之后才第一次接触到RPC框架Dubbo(其实也不能这么说,Http也可以算作RPC)。

匆匆几年,不知什么时候RPC这个名词已经是招聘JD里面的常客了?

RPC是什么?

RPC是什么,第一次用Dubbo的时候我也是很懵逼的。

受当时的知识面所局限,我在最开始的接触的Dubbo时一直很不理解为什么我只是用定义的接口调用了一个方法,远端的服务就能接收到我的请求并处理然后响应结果。

后来使用了Debug大法才恍然大悟,原来还能这么玩。

下面引用一段网上经典的描述及图片来对RPC做一个简单的理解。

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

RPC调用过程/侵删

RPC事件顺序

  1. 客户端(client function)调用客户端存根(client stub,可以理解为一个代理)。该调用是本地过程调用。
  2. 客户端存根(client stub)将参数打包到消息中,并进行系统调用以发送消息。
  3. 客户端的操作系统将消息从客户端计算机发送到服务器计算机。
  4. 服务器的操作系统将传入的数据包传递到服务器存根(server stub,可以理解为一个代理)。
  5. 服务器存根(server stub)解包消息得到调用参数。
  6. 服务器存根(server stub)调用真实的服务(server function)获得结果并以相同的方式反向返回结果。

RPC框架的基本实现

根据上面的理论我们大致知道一个RPC框架该怎么去实现。

对于一个0.0.1版本的RPC框架,我们的需求不多,只要能够调用成功服务端并能成功返回结果就OK。

RPC服务端的实现

首先我们需要写一个服务端,暂时使用BIO来处理网络通信,并且支持多线程处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public class RpcServer {

/**
* 服务端绑定的IP
*/
private String host;

/**
* 服务端绑定的端口
*/
private int port;

private ServerSocket server;

/**
* RPC服务映射,不支持一个服务接口有多个服务实现
* 服务接口类名 => 服务接口实现类
*/
private final ConcurrentMap<String, Object> serviceMappings = new ConcurrentHashMap<>();

/**
* RpcServer的运行状态
*/
private volatile boolean running = false;

public RpcServer(String host, int port) {
this.host = host;
this.port = port;
}

/**
* 启动服务端,并监听客户端Socket的连接请求
*
* @throws IOException
*/
public void startServer() throws IOException {
// 注册一个ShutdownHook,用于在应用关闭时来调用RpcServer的关闭逻辑
Runtime.getRuntime().addShutdownHook(new Thread(RpcServer.this::stop));
server = new ServerSocket();
server.bind(new InetSocketAddress(this.host, this.port));
System.out.println("服务器启动成功,绑定地址:" + this.server.getLocalSocketAddress());
running = true;
while (running) {
try {
Socket socket = this.server.accept();
System.out.println("接收到客户端连接:" + socket);
new Thread(new RequestHandler(socket),
"RequestHandler[" + socket.getRemoteSocketAddress().toString() + "] Thread").start();
} catch (IOException e) {
System.out.println("接收连接异常:" + e);
}
}
}

/**
* 关闭RpcServer
*/
public void stop() {
System.out.println("服务器关闭");
running = false;
try {
this.server.close();
} catch (Exception e) {
System.out.println("服务器关闭异常:" + e);
}
}

/**
* 注册一个接口服务
* RPC服务接口 => RPC接口服务实例
*
* @param interfaceClass RPC接口服务类
* @param serviceObject RPC接口服务实例
*/
public void addServiceMapping(Class<?> interfaceClass, Object serviceObject) {
if (null != serviceMappings.get(interfaceClass.getName())) {
throw new RpcException("接口[" + interfaceClass.getName() + "]已经存在一个实现");
}
System.out.println("注册RPC服务[" + interfaceClass.getName() + " => " + serviceObject + "]");
this.serviceMappings.put(interfaceClass.getName(), serviceObject);
}

/**
* 客户端请求处理
*/
private final class RequestHandler implements Runnable {
// 客户端请求处理。。。
}
}

上面的代码可能有人会疑惑RpcServer#addServiceMapping()方法是干什么用的,我们先暂时放下RequestHandler这个类的实现,看看这个RpcServer我们应该怎么来启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 1. 定义一个RPC服务接口
*/
public interface HelloService {

String sayHello(String name);

}

/**
* 2. 实现HelloService接口
*/
public class HelloServiceImpl implements HelloService {

@Override
public String sayHello(String name) {
return "Hello, " + name;
}

}

/**
* 3. 启动RpcServer,注册需要提供的Rpc服务与实例映射。
*/
public class Startup {

public static void main(String[] args) throws IOException {
// 1.设置RPC的监听地址与端口
RpcServer rpcServer = new RpcServer("localhost", 10210);
// 2.服务启动前,注册要提供的RPC服务接口与RPC服务接口的实现
rpcServer.addServiceMapping(HelloService.class, new HelloServiceImpl());
// 3.启动服务
rpcServer.startServer();
}

}

通过Startup#main()中的三个步骤就完成了RpcServer的启动,很显然RpcServer#addServiceMapping()就是用来将所有需要提供服务的接口添加一个mapping。

RpcServer的代码中我们用RequestHander来处理服务端接收到Socket连接,RequestHandler主要职责有三:读取请求数据,调用服务目标方法,写出响应数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* 客户端请求处理
*/
private final class RequestHandler implements Runnable {

private Socket socket;

private ObjectInputStream in;

private ObjectOutputStream out;

RequestHandler(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
RpcResponse rpcResponse = new RpcResponse();
try {
// 读取数据
RpcRequest rpcRequest = readData();
// 调用服务,暂时忽略自定义的异常及下面的异常处理
Object result = invokeService(rpcRequest);
// 写出数据
rpcResponse.setResponse(result);
writeData(rpcResponse);
} catch (IOException e) {
System.out.println("从Socket中读取或写入数据异常:" + e);
} catch (Exception e) {
rpcResponse.setThrowable(e);
try {
writeData(rpcResponse);
} catch (IOException ie) {
System.out.println("服务端响应数据异常:" + ie);
}
} finally {
IOUtils.close(this.socket, this.in, this.out);
}
}

/**
* 将响应结果写出
*
* @param response 响应结果
* @throws IOException 写出结果发生异常
*/
private void writeData(RpcResponse response) throws IOException {
out = new ObjectOutputStream(this.socket.getOutputStream());
out.writeObject(response);
out.flush();
}

/**
* 从Socket中读取数据
*
* @return Rpc请求
* @throws IOException 读取数据异常
* @throws ClassNotFoundException Rpc请求,或者Rpc请求中所带的class在服务端不存在
*/
private RpcRequest readData() throws IOException, ClassNotFoundException {
this.in = new ObjectInputStream(this.socket.getInputStream());
return (RpcRequest) in.readObject();
}

/**
* 调用服务的方法
*
* @param rpcRequest Rpc请求
* @return 调用结果
*/
private Object invokeService(RpcRequest rpcRequest) {
Object serviceObject = serviceMapping.get(rpcRequest.getInterfaceClassName());
if (null == serviceObject) {
throw new NotFoundServiceException("服务类[" + rpcRequest.getInterfaceClassName() + "]没有找到可用的服务");
}
try {
Method method = serviceObject.getClass().getMethod(rpcRequest.getMethodName(),
rpcRequest.getParameterTypes());
return method.invoke(serviceObject, rpcRequest.getArguments());
} catch (NoSuchMethodException | SecurityException e) {
throw new NotFoundServiceException("服务类[" + rpcRequest.getInterfaceClassName() + "]没有找到目标方法", e);
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
throw new NotFoundServiceException("服务类[" + rpcRequest.getInterfaceClassName() + "]调用目标方法异常", e);
}
}
}

RPC客户端的实现

如何实现客户端,让调用方在无感知的情况下就能实现RPC远程调用呢?我们可以使用JDK提供的动态代理来实现调用方的无感知的远程调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 使用JDK动态代理来无感知的完成RPC调用的内部细节
*/
public class RpcInvocationHandler implements InvocationHandler {

private RpcInvoker<?> invoker;

public RpcInvocationHandler(RpcInvoker<?> invoker) {
this.invoker = invoker;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 排除掉所有定义在Object.class的方法,已经toString、hashCode、equals方法
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(methodName, parameterTypes, args);
}

}

RpcInvocationHandler有个细节要处理,排除掉所有定义在Object.class中的方法,比如Object#wait()Object#notify()等,还要排除invoker中重写的toStringhashCodeequals方法。

在上面的RpcInvocationHandler构造方法中我们传入了一个叫做RpcInvoker(RPC调用者?)的对象,其实它封装了一系列的RPC调用的内部处理细节(当然现在里面其实没做太多的事情。。。)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/**
* RPC调用者???
*/
public class RpcInvoker<T> {

/**
* RPC客户端
*/
private RpcClient rpcClient;

/**
* 接口服务类
*/
private Class<T> interfaceClass;

/**
* 构造一个RpcInvoker
*
* @param host RPC服务器端监听host
* @param port RPC服务器端监听的port
* @param interfaceClass 目标服务接口类
*/
public RpcInvoker(String host, int port, Class<T> interfaceClass) {
this.rpcClient = new RpcClient(host, port);
this.interfaceClass = interfaceClass;
}

/**
* @param methodName 方法名
* @param parameterTypes 参数类型
* @param args 方法参数
* @return 结果
* @throws Throwable 服务端及客户端异常,包括RPC服务抛出的异常
*/
public Object invoke(String methodName, Class<?>[] parameterTypes, Object[] args)
throws Throwable {
RpcResponse rpcResponse = null;
try {
// 实例化一个RpcRequest
RpcRequest rpcRequest = new RpcRequest().newRpcRequest(this.interfaceClass.getName(), methodName,
parameterTypes, args);
// 通过RPC Client将数据发送到RPC Server
this.rpcClient.writeData(rpcRequest);
// 从RPC Server读取响应结果
rpcResponse = (RpcResponse) this.rpcClient.readData();
} finally {
this.rpcClient.close();
}
// 异常处理
if (null == rpcResponse) {
throw new RpcException("RPC调用为获取到结果");
}
if (null != rpcResponse.getThrowable()) {
throw rpcResponse.getThrowable();
}
// 返回真实结果给服务调用方
return rpcResponse.getResponse();
}

}


/**
* RPC客户端,负责收发数据,此处使用短连接,及每一次RPC调用都是一个新的Socket
*/
public class RpcClient {

private Socket socket;

private ObjectOutputStream out;

private ObjectInputStream in;

public RpcClient(String host, int port) {
this.socket = new Socket();
try {
this.socket.connect(new InetSocketAddress(host, port), 3000);
} catch (Exception e) {
throw new RpcNetworkException("连接服务器[" + host + ":" + port + "]异常", e);
}
}

/**
* 写数据
*
* @param object 数据
* @throws IOException 写出异常
*/
public void writeData(Object object) throws IOException {
this.out = new ObjectOutputStream(this.socket.getOutputStream());
this.out.writeObject(object);
this.out.flush();
}

/**
* 读数据
*
* @return 读去结果
* @throws IOException 读取异常
* @throws ClassNotFoundException 返回的结果的class在客户端不存在
*/
public Object readData() throws IOException, ClassNotFoundException {
this.in = new ObjectInputStream(this.socket.getInputStream());
return this.in.readObject();
}

public void close() {
IOUtils.close(this.in, this.out, this.socket);
}

}

我们以HelloService这个RPC接口服务为例来使用一下这个简单的RPC实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Startup {

public static void main(String[] args) {
// 实例化一个RpcInvoker
RpcInvoker<HelloService> invoker = new RpcInvoker<>("localhost", 10210, HelloService.class);
// 使用JDK动态代理来得到HelloService
HelloService helloService =
(HelloService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{HelloService.class}, new RpcInvocationHandler(invoker));
// 调用sayHello方法
final String result = helloService.sayHello("Simple RPC");
// 打印结果
System.out.println("结果:" + result);
}
}

数据是怎样交互的

从上面RequestHandler中可以看出,读取数据的时用的ObjectInputStream读取,直接将对象装换成一个RpcRequest,写出数据时也是将结果包装成一个RpcResponse用ObjectOutputStream写出,那么这两个类具体有什么呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// ============================== RpcRequest ==============================
/**
* RPC请求,网络传输需要实现Serializable接口
*/
public class RpcRequest implements Serializable {

private static final long serialVersionUID = -9213158787762981233L;

/**
* 目标接口类
*/
private String interfaceClassName;

/**
* 目标接口方法名
*/
private String methodName;

/**
* 目标接口方法参数类型
*/
private Class<?>[] parameterTypes;

/**
* 目标接口方法参数
*/
private Object[] arguments;

// 省略getter setter方法
}


// ============================== RpcResponse ==============================
/**
* RPC响应,网络传输需要实现Serializable接口
*/
public class RpcResponse implements Serializable {

private static final long serialVersionUID = -5573289211420999714L;

/**
* 响应结果
*/
private Object response;

/**
* 服务端的异常,包括服务方法抛出的异常
*/
private Throwable throwable;

/**
* @return the response
*/
public Object getResponse() {
return response;
}

/**
* @param response the response to set
*/
public void setResponse(Object response) {
this.response = response;
}

/**
* @return the throwable
*/
public Throwable getThrowable() {
return throwable;
}

/**
* @param throwable the throwable to set
*/
public void setThrowable(Throwable throwable) {
this.throwable = throwable;
}

}

RpcRequest中的interfaceClassNamemethodNameparameterTypes三个成员属性就能确定一个服务类的方法,这三者可被称作“RPC服务三元组”。而arguments则承载着这个方法的调用参数。

RpcResponse则有repsonsethrowable两个成员属性,前者是RPC服务真实的调用结果,后者则包括了一系列的异常,如客户端异常、服务端异常、RPC服务抛出的异常。

客户端将RpcRequest序列化后通过网络传输到服务端,服务端接收到RpcRequest,通过“RPC服务三元组”找到目标服务方法,执行此方法得到结果,并将调用结果包装成RpcResponse序列化后写回给客户端,客户端接收数据后获取调用结果,就能得到RPC调用的结果。

据我有限的知道的几款RPC框架如Dubbo、Motan、sofa-rpc等都是基于此种最基本的形式来完成远程调用。

总结

通过上面的代码我们基本上完成了一个脆弱的RPC框架

当然这是一个v0.0.1版本的RPC框架,我们还有很多事情需要做,比如NIO、服务发现、异常处理、资源控制、优雅关机等等一系列问题。

这些问题容我来慢慢的各个击破。

下次再见。。。


  RPC
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×