手机版 欢迎访问创业网 - 全球最顶尖的创业信息平台(91cyl.com)网站

当前位置: 主页 > 运营

RPC分布式网络通信框架项目

时间:2024-03-29 11:03|来源:创业网 - 全球最顶尖的创业信息平台|作者:创业网|点击:

文章目录

对比单机聊天服务器、集群聊天服务器以及分布式聊天服务器

(1) 单机聊天服务器

首先让我们来看单机聊天服务器会遇到的一些性能瓶颈的问题:

  1. 聊天服务器所能承受的用户的并发量受限于硬件资源;
  2. 任何模块的修改,都会导致整个项目代码重新进行编译和部署;
  3. 系统中,有些模块是属于CPU密集型的,有些模块是属于I/O密集型的,造成各模块对于硬件资源的需求是不相同的。所以,将各模块部署在一台服务器上时,没办法按照各模块对资源的需求部署在特定的硬件资源上。

(2)集群聊天服务器

集群聊天服务器相对于单机聊天服务器,做了哪些优化,还存在哪些性能瓶颈的问题?
优点:
用户的并发量提升、部署集群服务器比较简单。
缺点
1、项目代码还是需要整体重新编译,而且需要进行多次部署;
2、对于某些模块根本不需要高并发(比如上方服务器中的后台管理模块),而使用集群聊天服务器后,在每一台服务器上都部署了相同的模块,可能会造成服务器资源的浪费。

集群:每一台服务器独立运行一个工程的所有模块。
分布式:一个工程拆分了很多模块,每一个模块独立部署运行在一个或多台服务器主机上,所有服务器协同工作共同提供服务,每一台服务器称作分布式的一个节点,根据节点的并发要求,对一个节点可以再做节点模块集群部署。

当引入了分布式部署聊天服务器的概念后,我们把每个模块按需部署在一台或多台服务器上,使得服务器的资源利用更加充分。同时,如果只是改动了某一个模块的代码,不用将所有的服务器代码都重新编译和部署,只需要重新编译相对应的模块代码。

但是,引入分布式服务器后,会遇到哪些难题需要我们解决呢?

  1. 我们应该如何对大系统的软件模块进行划分呢?怎么划分才能使得各模块的重复代码更少呢?
  2. 当模块被部署在不同的服务器上时,各模块之间如何进行访问呢?比如下图的机器1上的模块如何去调用机器2上的模块的一个业务方法呢?机器1上的模块进程1怎么调用机器1上的模块进程2里面的一个业务方法呢?


第1个问题通过软件设计师的经验来进行模块的划分,而第2个问题则是通过我们设计的RPC分布式网络框架来实现远程方法的调用,使得客户端基于此框架,能让不同网络结点之间的服务调用像调用本地服务一样简单。


RPC通信原理

RPC(Remote Procedure Call Protocol)远程过程调用协议

黄色部分:设计rpc方法参数的打包和解析,也就是数据的序列化和反序列化,使用Protobuf
绿色部分:网络部分,包括寻找rpc服务主机,发起rpc调用请求和响应rpc调用结果,使用muduo网络库和zookeeper服务配置中心(专门做服务发现)。
mprpc框架主要包含以上两个部分的内容。


使用Protobuf做数据的序列化,相比较于json,有哪些优点?

  1. Protobuf 是使用二进制存储数据的,而xmljson都是文本存储的;
  2. Protobuf 不需要存储额外的信息,而json是以键值对的方式来存储数据。

环境配置使用

项目代码工程目录

bin:可执行文件
build:项目编译文件
lib:项目库文件
src:源文件
test:测试代码
example:框架代码使用范例
CMakeLists.txt:顶层的cmake文件
README.md:项目自述文件
autobuild.sh:一键编译脚本

vscode远程开发Linux项目
muduo网络库编程示例
CMake构建项目集成编译环境
Linux环境下搭建muduo网络库

muduo库的安装需要依赖boost库,具体安装配置步骤参考博客:
Linux环境下搭建muduo网络库

网络I/O模型介绍
  • accept + read/write
    不是并发服务器
  • accept + fork - process-pre-connection
    适合并发连接数不大,计算任务工作量大于fork的开销
  • accept + thread thread-pre-connection
    比方案2的开销小了一点,但是并发造成线程堆积过多
  • muduo的设计:reactors in threads - one loop per thread
    方案的特点是one loop per thread,有一个main reactor(I/O)负载accept连接,然后把连接分发到某个sub reactor(Worker),该连接的所用操作都在那个sub reactor所处的线程中完成,多个连接可能被分派到多个线程中,以充分利用CPU。
    如果有过多的耗费CPU I/O的计算任务,可以创建新的线程专门处理耗时的计算任务。
  • reactors in process - one loop pre process
    nginx服务器的网络模块设计,基于进程设计,采用多个Reactors充当I/O进程和工作进程,通过一把accept锁,完美解决多个Reactors的“惊群现象”。

Protobuf安装配置

protobuf(protocol buffer)是google 的一种数据交换的格式,它独立于平台语言。
google 提供了protobuf多种语言的实现:java、c#、c++、go 和 python,每一种实现都包含了相应语言的编译器以及库文件。
由于它是一种二进制的格式,比使用 xml(20倍) 、json(10倍)进行数据交换快许多。可以把它用于分布式应用之间的数据通信或者异构环境下的数据交换。作为一种效率和兼容性都很优秀的二进制数据传输格式,可以用于诸如网络传输、配置文件、数据存储等诸多领域。

ubuntu protobuf环境搭建

见项目资料下载地址或者在github源代码下载地址:https://github.com/google/protobuf
源码包中的src/README.md,有详细的安装说明,安装过程如下:
1、解压压缩包:unzip protobuf-master.zip
2、进入解压后的文件夹:cd protobuf-master
3、安装所需工具:sudo apt-get install autoconf automake libtool curl make g++ unzip
4、自动生成configure配置文件:./autogen.sh
5、配置环境:./configure
6、编译源代码(时间比较长):make
7、安装:sudo make install
8、刷新动态库:sudo ldconfig


protobuf 实践讲解

首先在服务端编写proto配置文件,比如测试代码中的test.proto,配置文件中包含需要需要进行序列化的请求消息和响应消息,以及在protobuf中定义了描述rpc方法的服务类型,传入相应的rpc请求方法名以及相应的参数,返回参数。

syntax = "proto3"; // 声明了protobuf的版本

package fixbug; // 声明了代码所在的包(对于C++来说是namespace)

// 定义下面的选项,表示生成service服务类和rpc方法描述,默认不生成
option cc_generic_services = true;

message ResultCode
{
    int32 errcode = 1;
    bytes errmsg = 2;
}

// 数据   列表   映射表
// 定义登录请求消息类型  name   pwd
message LoginRequest
{
    bytes name = 1;
    bytes pwd = 2;
}

// 定义登录响应消息类型
message LoginResponse
{
    ResultCode result = 1;
    bool success = 2;
}

message GetFriendListsRequest
{
    uint32 userid = 1;
}

message User
{
    bytes name = 1;
    uint32 age = 2;
    enum Sex
    {
        MAN = 0;
        WOMAN = 1;
    }
    Sex sex = 3;
}

message GetFriendListsResponse
{
    ResultCode result = 1;
    repeated User friend_list = 2;  // 定义了一个列表类型
}

// 在protobuf里面怎么定义描述rpc方法的类型 - service
service UserServiceRpc
{
    rpc Login(LoginRequest) returns(LoginResponse);
    rpc GetFriendLists(GetFriendListsRequest) returns(GetFriendListsResponse);
}

通过在终端中执行protoc test.proto --cpp_out=./即可在当前文件夹中生成test.pb.cctest.pb.h,其中massage以及service生成了相应的类,且都是通过继承得到的,class LoginRequest: public::google::protobuf::Message,而UserServiceRpc服务生成了两个类:class UserServiceRpc: public google::protobuf::Service提供给服务提供者进行调用、class UserService_Stub: public UserServiceRpc提供给服务消费者进行调用。如下所示:

在服务提供者方,由于生成的UerServiceRpc类从public google::protobuf::Service继承而来,而类中的Login()方法和GetFriendLists()方法都是虚函数,因此服务提供者需要对相应的方法进行重写。
在服务消费者方,我们可以发现生成的UserServiceRpc_Stub类没有提供默认的构造函数,而只提供了一个含有google::protobuf::RpcChannel* channe参数的构造函数,而且类中的Login()方法和GetFriendLists方法也都是虚函数,在两个方法的底层都调用了RpcChannel类的CallMethod方法。由于RpcChannel 类中的CallMethod方法是一个纯虚函数,因此我们需要重新定义一个类MyRpcChannel继承RpcChannel,在派生类MyRpcChannel中对继承而来的CallMethod方法进行重写。在构造UserServiceRpc_Stub类时,直接向其传递一个MyRpcChannel对象,即可调用派生类中重写函数。


如何将本地服务发布成rpc服务?

用户要将本地服务发布成一个可分布式部署的rpc服务,首先需要通过在user.proto配置文件中写出描述这个rpc方法的方法名字、参数类型以及返回值的响应类型。然后通过在终端执行protoc user.proto --cpp_out=./生成user.pb.ccuser.pb.h,这样就相当于rpc调用方和rpc提供方就生成了一个协议。在rpc方法的提供方,我们可以将本地服务UserServiceUerServiceRpc继承而来,然后重写Login()方法,重写好之后由框架对Login()方法进行直接调用。最后在服务发布方初始化调用的框架MprpcApplication::Init(argc, argv);,定义一个rpc网络服务对象RpcProvider provider;,把UserService对象发布到rpc节点上provider.NotifyService(new UserService());,启动一个rpc服务发布节点provider.Run();Run以后,进程进入阻塞状态,等待远程的rpc调用请求,这样就完成了将本地服务发布成rpc远程服务。

example/user.proto

syntax = "proto3";

package fixbug;

option cc_generic_services = true;

message ResultCode
{
    int32 errcode = 1; 
    bytes errmsg = 2;
}

message LoginRequest
{
    bytes name = 1;
    bytes pwd = 2;
}

message LoginResponse
{
    ResultCode result = 1;
    bool sucess = 2;
}

message RegisterRequest
{
    uint32 id = 1;
    bytes name = 2;
    bytes pwd = 3;
}

message RegisterResponse
{
    ResultCode result = 1;
    bool sucess = 2;
}

service UserServiceRpc
{
    rpc Login(LoginRequest) returns(LoginResponse);
    rpc Register(RegisterRequest) returns(RegisterResponse);
}

example/callee/usrservice.cc

#include 
#include 
#include "user.pb.h"
#include "mprpcapplication.h"
#include "rpcprovider.h"

/*
UserService原来是一个本地服务,提供了两个进程内的本地方法,Login和GetFriendLists
*/
class UserService : public fixbug::UserServiceRpc // 使用在rpc服务发布端(rpc服务提供者)
{
public:
    bool Login(std::string name, std::string pwd)
    {
        std::cout << "doing local service: Login" << std::endl;
        std::cout << "name:" << name << " pwd:" << pwd << std::endl;  
        return false;
    }

    bool Register(uint32_t id, std::string name, std::string pwd)
    {
        std::cout << "doing local service: Register" << std::endl;
        std::cout << "id:" << id << "name:" << name << " pwd:" << pwd << std::endl;
        return true;
    }

    /*
    重写基类UserServiceRpc的虚函数 下面这些方法都是框架直接调用的
    1. caller   ===>   Login(LoginRequest)  => muduo =>   callee 
    2. callee   ===>    Login(LoginRequest)  => 交到下面重写的这个Login方法上了
    */
    void Login(::google::protobuf::RpcController* controller,
                       const ::fixbug::LoginRequest* request,
                       ::fixbug::LoginResponse* response,
                       ::google::protobuf::Closure* done)
    {
        // 框架给业务上报了请求参数LoginRequest,应用获取相应数据做本地业务
        std::string name = request->name();
        std::string pwd = request->pwd();

        // 做本地业务
        bool login_result = Login(name, pwd); 

        // 把响应写入  包括错误码、错误消息、返回值
        fixbug::ResultCode *code = response->mutable_result();
        code->set_errcode(0);
        code->set_errmsg("");
        response->set_sucess(login_result);

        // 执行回调操作   执行响应对象数据的序列化和网络发送(都是由框架来完成的)
        done->Run();
    }

    void Register(::google::protobuf::RpcController* controller,
                       const ::fixbug::RegisterRequest* request,
                       ::fixbug::RegisterResponse* response,
                       ::google::protobuf::Closure* done)
    {
        uint32_t id = request->id();
        std::string name = request->name();
        std::string pwd = request->pwd();

        bool ret = Register(id, name, pwd);

        response->mutable_result()->set_errcode(0);
        response->mutable_result()->set_errmsg("");
        response->set_sucess(ret);

        done->Run();
    }
};

int main(int argc, char **argv)
{
    // 调用框架的初始化操作
    MprpcApplication::Init(argc, argv);

    // provider是一个rpc网络服务对象。把UserService对象发布到rpc节点上
    RpcProvider provider;
    provider.NotifyService(new UserService());

    // 启动一个rpc服务发布节点   Run以后,进程进入阻塞状态,等待远程的rpc调用请求
    provider.Run();

    return 0;
}

mprpc框架基础类设计

当完成了rpc服务提供方的代码后,接下来我们就来看看如何设计和编写的底层框架的代码的,首先关注框架的初始化操作MprpcApplication::Init(argc, argv);,我们在src目录下创建mprpcapplication.hmprpcapplication.cc
mprpcapplication.h

#pragma once

#include "mprpcconfig.h"
#include "mprpcchannel.h"
#include "mprpccontroller.h"

// mprpc框架的基础类,负责框架的一些初始化操作
class MprpcApplication
{
public:
    static void Init(int argc, char **argv);
    static MprpcApplication& GetInstance();
    static MprpcConfig& GetConfig();
private:
    static MprpcConfig m_config;

    MprpcApplication(){}
    MprpcApplication(const MprpcApplication&) = delete;
    MprpcApplication(MprpcApplication&&) = delete;
};

mprpcapplication.cc

#include "mprpcapplication.h"
#include 
#include 
#include 

MprpcConfig MprpcApplication::m_config;

void ShowArgsHelp()
{
    std::cout<<"format: command -i " << std::endl;
}

void MprpcApplication::Init(int argc, char **argv)
{
    if (argc < 2)
    {
        ShowArgsHelp();
        exit(EXIT_FAILURE);
    }

    int c = 0;
    std::string config_file;
    while((c = getopt(argc, argv, "i:")) != -1)
    {
        switch (c)
        {
        case 'i':
            config_file = optarg;
            break;
        case '?':
            ShowArgsHelp();
            exit(EXIT_FAILURE);
        case ':':
            ShowArgsHelp();
            exit(EXIT_FAILURE);
        default:
            break;
        }
    }

    // 开始加载配置文件了 rpcserver_ip=  rpcserver_port   zookeeper_ip=  zookepper_port=
    m_config.LoadConfigFile(config_file.c_str());

    // std::cout << "rpcserverip:" << m_config.Load("rpcserverip") << std::endl;
    // std::cout << "rpcserverport:" << m_config.Load("rpcserverport") << std::endl;
    // std::cout << "zookeeperip:" << m_config.Load("zookeeperip") << std::endl;
    // std::cout << "zookeeperport:" << m_config.Load("zookeeperport") << std::endl;
}

MprpcApplication& MprpcApplication::GetInstance()
{
    static MprpcApplication app;
    return app;
}

MprpcConfig& MprpcApplication::GetConfig()
{
    return m_config;
}

mprpc框架的配置文件加载

当获取到配置文件以后,加载配置文件的函数LoadConfigFile()以及查询配置项信息的函数Load()mprpcconfig.hmprpcconfig.cc中定义和实现。
mprpcconfig.h

#pragma once

#include 
#include 

// rpcserverip   rpcserverport    zookeeperip   zookeeperport
// 框架读取配置文件类
class MprpcConfig
{
public:
    // 负责解析加载配置文件
    void LoadConfigFile(const char *config_file);
    // 查询配置项信息
    std::string Load(const std::string &key);
private:
    std::unordered_map<std::string, std::string> m_configMap;
    // 去掉字符串前后的空格
    void Trim(std::string &src_buf);
};

mprpcconfig.cc

#include "mprpcconfig.h"

#include 
#include 

// 负责解析加载配置文件
void MprpcConfig::LoadConfigFile(const char *config_file)
{
    FILE *pf = fopen(config_file, "r");
    if (nullptr == pf)
    {
        std::cout << config_file << " is note exist!" << std::endl;
        exit(EXIT_FAILURE);
    }

    // 1.注释   2.正确的配置项 =    3.去掉开头的多余的空格 
    while(!feof(pf))
    {
        char buf[512] = {0};
        fgets(buf, 512, pf);

        // 去掉字符串前面多余的空格
        std::string read_buf(buf);
        Trim(read_buf);

        // 判断#的注释
        if (read_buf[0] == '#' || read_buf.empty())
        {
            continue;
        }

        // 解析配置项
        int idx = read_buf.find('=');
        if (idx == -1)
        {
            // 配置项不合法
            continue;
        }

        std::string key;
        std::string value;
        key = read_buf.substr(0, idx);
        Trim(key);
        // rpcserverip=127.0.0.1\n
        int endidx = read_buf.find('\n', idx);
        value = read_buf.substr(idx+1, endidx-idx-1);
        Trim(value);
        m_configMap.insert({key, value});
    }

    fclose(pf);
}

// 查询配置项信息
std::string MprpcConfig::Load(const std::string &key)
{
    auto it = m_configMap.find(key);
    if (it == m_configMap.end())
    {
        return "";
    }
    return it->second;
}

// 去掉字符串前后的空格
void MprpcConfig::Trim(std::string &src_buf)
{
    int idx = src_buf.find_first_not_of(' ');
    if (idx != -1)
    {
        // 说明字符串前面有空格
        src_buf = src_buf.substr(idx, src_buf.size()-idx);
    }
    // 去掉字符串后面多余的空格
    idx = src_buf.find_last_not_of(' ');
    if (idx != -1)
    {
        // 说明字符串后面有空格
        src_buf = src_buf.substr(0, idx+1);
    }
}

开发RpcProvider的网络服务

  • 首先服务提供方会向RpcProvider上注册服务对象UserService和对应的服务方法,然后在RpcProvider::NotifyService()中通过protobuf提供的抽象层的servicemethod,把服务对象和它的服务方法记录在了一个m_serviceMapMap表当中。
  • 当服务提供方调用provider.Run()启动一个rpc服务发布节点以后,就相当于通过moduo网络库启动了一个epoll + 多线程的服务器,启动之后就可以接收远程客户端的连接,如果远程有新的连接到来,muduo库会回调RpcProvider::OnConnection方法,实际上OnConnection不需要做特殊的操作,只是当客户端断开时,服务端在OnConnection中通过conn->shutdown()把相应的连接断开,释放Socket资源。在RpcProvider类中最重要的就是OnMessage方法,它是已建立连接用户的读写事件回调,如果远程有一个rpc服务的调用请求,那么OnMessage方法就会响应。数据到达以后,按照与服务请求方协商好的数据格式header_size(4个字节) + header_str + args_str解析数据,最终解析得到service_namemethod_name以及请求参数数据args_str,然后通过抽象层动态生成请求request和响应response,给method方法的调用,绑定一个Closure的回调函数,然后在框架上调用业务的方法service->CallMethod(method, nullptr, request, response, done),至此业务中的Login()方法就会被框架调用,在Login()方法中首先从请求参数中获取相应的数据做本地业务,接着把响应消息写入response,然后再执行一个回调done->Run()。这个回调函数就会调用我们绑定的RpcProvider::SendRpcResponse方法,该方法用于序列化rpc的响应和网络发送,最后由rpcprovider主动断开连接conn->shutdown(),节省资源,给其他rpc客户端继续提供服务。


RPC方法的调用过程

Copyright © 2005-2024 创业网 版权所有 网站地图