RPC框架

本文最后更新于 2024年5月13日 晚上

一、基础概念

  • RPC(Remote Procedure Call)即远程过程调用,允许一台计算机调用另一台计算机上的程序得到结果,而代码中不需要做额外的编程,就像在本地调用一样。
  • 所以可以把RPC看作是一种网络服务,网络传输函数调用信息,远端运行对应函数,然后再网络返回运行结果。这是一个很基本的运行模式,但是具体的实现方案却是多种多样的,但主要包含这些基本部分:
  1. 网络编程接口和协程。几乎所有的网络编程都会用多路复用,典型的就是使用epoll;而解决这种基于事件的最好编程模式是使用协程,用顺序逻辑写异步处理代码。
  2. 服务器架构。作为RPC服务器需要考虑如何设计出效率更高、吞吐量更高的服务器,这就是服务器机构设计的意义所在。经过多年的发展从最简单的多进程、多线程,到后来的多路复用异步编程,再到协程设计都是服务器架构设计的提升。
  3. 分发、动态代理。服务器的设计思路一般是每次被远程调用都运行一个相同的dispatch函数,然后在分发函数的内部去根据调用方法执行不同的函数。比如用stub拿到对应的方法名和参数类型,通过反射的方式找到方法,然后调用相应的方法并将结果返回给stub
  4. 网络协议。即客户端和服务端之间通信选用的协议,传输层是使用TCP/UDP,然后在应用层可以选择HTTP等。
  5. 序列化。远程函数的调用的时候需要传输参数和运行结果,这里需要将代码中的结构体或者对象进行序列化和反序列化,有protobuf这样的工具帮助生成代码。
  6. 服务注册。

二、PhxRPC

  • PhxRPC项目是一个完整的RPC框架,包括协程模块、服务器模块、网络协议模块、序列化模块等。
  • 协程是使用boost::ucontext实现的,并且是无栈协程。如果想要创建新的协程需要在主协程中创建,不能在协程内创建新的协程。具体来说代码中通过一个AddTask函数创建协程,但这里只是将该协程加入到队列中,需要等到主协程的调度循环中取出这个协程并开始运行。这里和libco中的设计不太一样,libco是在创建协程之后立刻运行,但这就需要协程栈的支持了,必须要退回到上一级的协程。协程是一个非常大的话题,详细见协程库
  • 网络协议上使用的HTTP,也属于比较常规的内容,但是注意HTTP的接口在实现时需要考虑是在协程上运行的,所以应该使用重新定义的网络读写接口。

2.1 基础框架

RPC架构.png
  • PhxRPC的服务器架构有核心三个角色,分别是HshaServer, HshaServerUnit, Worker,图中是用虚线框表示的,他们都运行在各自的线程中(即每个虚线框都表示一个线程);存在UThreadEpollScheduler单元的线程即表示在这个线程中存在协程调度单元,这个线程是以协程的模式运行的。
  • HshaServer主要用于监听新的TCP链接,HshaServerUnit处理所有RPC请求响应相关的网络读写,而下方的worker用于实际执行用户自定义的业务逻辑。

2.2 核心流程

PhxRPC流程图.png

2.2.1 HshaServer

  • HshaServer的职责是监听Socket端口,然后建立与客户端的连接,并将连接的fd传给HshaServerUnit进行后续处理。

处理流程

  • 首先HshaServer会在构造函数里就创建多个HshaServerUnit,用于在后台处理各种事件,创建的时候就会启动线程。使用链表的方式来管理所有的HshaServerUnit
  • 监听的工作是HshaServerAcceptor完成的,处理流程是先建立并绑定socket端口,然后通过死循环的方式不断获得新到来的连接。
  • 接着会从链表中选择一个HshaServerUnit来处理这次请求,这里是可以考虑使用一些负载均衡策略的,PhxRPC的实现方式用了简单的轮询方案。
  • 最后是调用选定的HshaServerUnit的方法,将这个待处理的fd放入其内部的HshaServerIO的一个队列中,再通过调用该HshaServerUnit内部的UThreadEpollScheduler::NotifyEpoll方法唤醒线程进行处理。

线程唤醒问题

  • HshaSercerHshaServerUnit其实运行在不同的线程中的,当我们在HshaServer流程中添加一个任务之后想要立刻唤醒HshaServerUnit所在的线程立刻处理,但因为HshaServerUnit运行在协程模式下,目前该线程极有可能在epoll_wait等待状态。
  • 所以需要一个机制将其从等待状态唤醒,PhxRPC选择的是创建一个管道,然后将管道也加入到epoll中进行轮询,所以当另外的线程想要唤醒自己的时候,只需要向管道中写入新的数据就可以了。这便是NotifyEpoll的实现方案。

2.2.2 HshaServerUnit

  • HshaServerUnit是用来处理RPC的网络数据读写的,在承接到HshaServer传来的socket连接之后需要先读出请求信息,然后将请求通过DataFlow发送给Worker进行处理;等待Worker处理完成之后需要再从DataFlow中取出返回信息,并将返回的数据写入socket
  • 可以发现HshaServerUnit是一个中间层的角色,只汇总处理Socket IO的问题,并执行用户定义的函数。并且一个HshaServerUnit可以管理多个Worker,这以为着多个WorkerRPC的网络传输上都是汇总给一个线程来处理的。

处理流程

  • HshaServerIO::HandlerAcceptedFd函数是读取队列中缓存的socket连接,然后创建协程处理这个socket相关的网络数据读写,注册的函数是HshaServerIO::IOFunc
  • IOFunc的流程是先读数据,解析HTTP协议并生成request结构体;然后将其放入DataFlow中,并唤醒一个Worker来处理请求。
  • 在通过WokerPool来唤醒Woker时,也是可以选择唤醒谁来处理的,可以有负载均衡的方案,但PhxRPC依然使用的轮询方案来处理。但不同的是,所有任务都被放在了DataFlow中,所有的worker对任务是一种竞争关系,不一定是唤醒的Worker拿到这个任务。这个和HshaServerUnit获取socket连接还不太一样,这个是直接写入了HshaServerUnit内部的队列,所以是不存在竞争关系的。
  • 然后当前协程IOFunc被挂起,等待任务处理完成之后恢复运行(具体是等ActiveSocketFunc唤醒准备好的协程),恢复之后将结果写入socket,任务就完成了。
  • HshaServerIO::ActiveSocketFunc是从DataFlow当中读取完成的response,这里面保存了socket信息以及协程ID,于是就可以通过协程ID恢复对应的协程了。

2.2.3 Worker

  • Worker::HandlerNewRequestFunc函数用于从DataFlow中读取数据,并生成对应的处理协程,注册函数Worker::UThreadFunc;这个函数内部就是调用用户自定义的处理函数,并且将response写入到DataFlow中,最好还需要唤醒HshaServerUnit
  • 一开始看到Worker是多线程实现的,在想为啥处理函数的也需要用协程来组织,毕竟处理函数一般都是消耗CPU来处理,并不存在阻塞浪费资源的情况。但是这是一个RPC服务器,我们在处理某个请求的时候是有可能需要通过网络去访问别的服务的,比如mysql, redis, rpc_server等,所以这里的设计也是用协程。

2.3 序列化工具:ProtoBuf

参考资料:

C++ protobuf示例

ProtoBuf 入门教程

  • Protocol Buffers是一种语言无关、平台无关、可扩展的序列化结构数据的方法,它可用于(数据)通信协议、数据存储等。
  • 灵活,高效,自动化机制的结构数据序列化方法,可类比 XML,但是比 XML 更小(3 ~ 10倍)、更快(20 ~ 100倍)、更简单,json\xml都是基于文本格式,protobuf是二进制格式。

2.3.1 安装

  • 官网下载项目源码:https://github.com/protocolbuffers/protobuf.git

  • 下载依赖的第三方库项目:git submodule update --init --recursive

  • 使用CMake编译安装,创建一个build文件夹,然后执行命令cmake -DCMAKE_BUILD_TYPE=Release -Dprotobuf_BUILD_TESTS=OFF -DABSL_PROPAGATE_CXX_STD=ON ..

  • 接着通过make编译并安装项目即可make & sudo make install

  • 安装操作将会把三种文件放置到三个地方,分别是:

    • 生成代码的可执行文件放在/usr/local/bin
    • 生成代码需要引用的头文件在/usr/local/include
    • 编译链接的-lprotobuf库文件放置在/usr/local/lib

2.3.2 消息定义

  • 首先所有数据结构的定义都放在单独的.proto文件中,结构定义规则如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
message xxx {
// 字段规则:required -> 字段只能也必须出现 1 次
// 字段规则:optional -> 字段可出现 0 次或 1 次
// 字段规则:repeated -> 字段可出现任意多次(包括 0)
// 类型:int32、int64、sint32、sint64、string、32-bit ....
// 字段编号:0 ~ 536870911(除去 19000 到 19999 之间的数字)
[字段规则] [类型] [名称] = [字段编号];
}


message Person {
required string name = 1;
required int32 id = 2;
optional string email = 3;
}
  • 此外可以在.proto文件中通过定义package xxx来设置生成的代码的namespace
  • 下面定义了protobuf的格式与C++类型的对应表
.protoType Notes C++Type
double double
float float
int32 使用变长编码,对于负值的效率很低,如果你的域有可能有负值,请使用sint64替代 int32
uint32 使用变长编码 uint32
uint64 使用变长编码 uint64
sint32 使用变长编码,这些编码在负值时比int32高效的多 int32
sint64 使用变长编码,有符号的整型值。编码时比通常的int64高效。 int64
fixed32 总是4个字节,如果数值总是比总是比228大的话,这个类型会比uint32高效。 uint32
fixed64 总是8个字节,如果数值总是比总是比256大的话,这个类型会比uint64高效。 uint64
sfixed32 总是4个字节 int32
sfixed64 总是8个字节 int64
bool bool
string 一个字符串必须是UTF-8编码或者7-bit ASCII编码的文本。 string
bytes 可能包含任意顺序的字节数据。 string
  • 其他复合类型的定义方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
syntax = "proto3";//指定版本信息,不指定会报错

// 枚举:
enum PhoneType //枚举消息类型,使用enum关键词定义,一个电话类型的枚举类型
{
MOBILE = 0; //proto3版本中,首成员必须为0,成员不应有相同的值
HOME = 1;
WORK = 2;
}

message PhoneNumber
{
string number = 1; // 电话号码字段
PhoneType type = 2; // 电话类型字段,电话类型使用PhoneType枚举类型
}


// 数组:
message Msg {
// 只要使用repeated标记类型定义,就表示数组类型。
repeated int32 arrays = 1;
repeated string names = 2;
}


// map:
syntax = "proto3";
message Product
{
string name = 1; // 商品名
// 定义一个k/v类型,key是string类型,value也是string类型
map<string, string> attrs = 2; // 商品属性,键值对
}
  • 还有一种是消息嵌套的定义方式,可以引用别的消息结构用于定义自己。也可以直接嵌套定义在自己内部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 1.引用其他消息类型的用法
// 定义Result消息
message Result {
string url = 1;
string title = 2;
repeated string snippets = 3; // 字符串数组类型
}

// 定义SearchResponse消息
message SearchResponse {
// 引用上面定义的Result消息类型,作为results字段的类型,这里就算是定义在别的文件中也是可以通过import导入的
repeated Result results = 1; // repeated关键词标记,说明results字段是一个数组
}

// 2.消息嵌套
message SearchResponse {
// 嵌套消息定义
message Result {
string url = 1;
string title = 2;
repeated string snippets = 3;
}
// 引用嵌套的消息定义
repeated Result results = 1;
}

2.3.3 代码生成

  • 这一步就是按照我们的定义规则生成对应的代码,以便项目使用,通过protoc程序完成代码生成
1
2
3
4
5
6
7
8
// $SRC_DIR: .proto 所在的源目录
// --cpp_out: 生成 c++ 代码
// $DST_DIR: 生成代码的目标目录
// xxx.proto: 要针对哪个 proto 文件生成接口代码

protoc -I=$SRC_DIR --cpp_out=$DST_DIR $SRC_DIR/xxx.proto

protoc ./person.proto --cpp_out=./
  • 编译完成之后会生成xxx.pb.hxxx.pb.cpp两个文件,项目中通过引入这些生成的源文件来对数据进行序列化和反序列化。

2.3.4 库函数使用

  • 关于用户自定义消息的各种操作是通过protobuf库函数对外提供支持的,比如封装数据流读取到对象,将对象输出到流、字符串等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Person p;
p.set_name("test");
p.set_id(100);
p.set_email("a.iabc.com");

// 将p文本信息写入文件
auto *output = new google::protobuf::io::OstreamOutputStream(&fw);
google::protobuf::TextFormat::Print(p, output);
// 将p文本信息保存到字符串
std::string str1;
google::protobuf::TextFormat::PrintToString(p, &str1);

// 将p的二进制信息保存到字符串,序列化
std::string str;
p.SerializeToString(&str);
// 将二进制数据转换到对象p1中,反序列化
Person p1;
p1.ParseFromString(str);

RPC框架
https://lluvialuo.github.io/2024/05/13/RPC框架/
作者
Lluvia Luo
发布于
2024年5月13日
许可协议