目录etcd模块etcd的封装brpc模块brpc中Channel的封装brpc服务器brpc客户端etcdbrpc客户端和服务器的协作etcd模块由于etcd没有现成的封装好的类太过于麻烦了外面需要自己封装如下图片的功能。在项目中我们不需要构造etcd的服务器仅需要构造服务端进行服务发现和服务注册所以etcd的作用就是进行已知服务的发现和未知服务的注册这里仅仅支持在一个base目录下自定义服务路径的注册所以我们构造服务发现对象时需要指定base目录。etcd的封装namespace bite_im{ //服务注册客户端类 class Registry { public: using ptr std::shared_ptrRegistry; Registry(const std::string host): //通过服务器地址创建etcd服务器 _client(std::make_sharedetcd::Client(host)) , //创建 TTL3秒 的租约并启动自动续约如果进程崩溃租约会在3秒后过期etcd 自动删除注册的服务 _keep_alive(_client-leasekeepalive(3).get()), //从保活对象中获取租约ID _lease_id(_keep_alive-Lease()){} //取消保活连接释放租约资源 ~Registry() { _keep_alive-Cancel(); } bool registry(const std::string key, const std::string val) { //put(key, val, _lease_id)将键值对与租约绑定.get是为了阻塞等待结果 //key注册的服务名val服务地址 //将来可以通过根地址直接进行服务发现 auto resp _client-put(key, val, _lease_id).get(); //判断是否成功 if (resp.is_ok() false) { LOG_ERROR(注册数据失败{}, resp.error_message()); return false; } return true; } private: std::shared_ptretcd::Client _client; //etcd客户端对象不需要服务器 std::shared_ptretcd::KeepAlive _keep_alive; //租约对象用于自动续约 uint64_t _lease_id; //租约ID用于绑定键值对 }; //服务发现客户端类 class Discovery { public: using ptr std::shared_ptrDiscovery; using NotifyCallback std::functionvoid(std::string, std::string); //进行服务发现需要4个参数etcd服务器地址服务注册根目录新增服务回调下线服务回调 //hostetcd服务器地址 //basedir服务注册的根目录用于服务发现 //put_cb新增服务的回调函数参数为服务名和地址 //del_cb下线服务的回调函数参数为服务名和地址 Discovery(const std::string host, const std::string basedir, const NotifyCallback put_cb, const NotifyCallback del_cb): _client(std::make_sharedetcd::Client(host)) , _put_cb(put_cb), _del_cb(del_cb){ //先进行服务发现, 先获取到当前根目录以下子目录的已发现的全部节点 auto resp _client-ls(basedir).get(); if (resp.is_ok() false) { LOG_ERROR(获取服务信息数据失败{}, resp.error_message()); } //遍历全部数据调用上线函数进行处理(服务上线是为了机械能rpc调用主要进行构造channel) int sz resp.keys().size(); for (int i 0; i sz; i) { if (_put_cb) _put_cb(resp.key(i), resp.value(i).as_string()); } //然后进行事件监控监控数据发生的改变并调用回调进行处理 //watch(key, callback, recursive)监控key目录下的数据变化并调用回调函数处理 //basedir服务注册的根目录用于服务发现 //callback事件回调函数参数为etcd::Response对象 //recursive是否递归监控子目录的变化 _watcher std::make_sharedetcd::Watcher(*_client.get(), basedir, std::bind(Discovery::callback, this, std::placeholders::_1), true); } ~Discovery() { //取消事件监控 _watcher-Cancel(); } private: void callback(const etcd::Response resp) { if (resp.is_ok() false) { LOG_ERROR(收到一个错误的事件通知: {}, resp.error_message()); return; } //用于实时响应服务注册和下线 for (auto const ev : resp.events()) { //这个服务的类型是PUT就表示需要上线调用上线函数添加或者保存管道信息 if (ev.event_type() etcd::Event::EventType::PUT) { //ev.kv().key() // 键名如 /services/user-service/10.0.0.1:9000 //ev.kv().as_string() // 值如 10.0.0.1:9000 if (_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string()); LOG_DEBUG(新增服务{}-{}, ev.kv().key(), ev.kv().as_string()); //这个服务的类型是DELETE_就表示节点已经被删除需要下线, 调用下线函数删除节点管道信息 }else if (ev.event_type() etcd::Event::EventType::DELETE_) { //ev.prev_kv().key() // 被删除的键名 //ev.prev_kv().as_string() // 被删除的值 if (_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string()); LOG_DEBUG(下线服务{}-{}, ev.prev_kv().key(), ev.prev_kv().as_string()); } } } private: NotifyCallback _put_cb; //上线服务的回调函数 NotifyCallback _del_cb; //下线服务的回调函数 std::shared_ptretcd::Client _client; //etcd客户端对象不需要服务器 std::shared_ptretcd::Watcher _watcher; //事件监控对象 }; }只要感知到变换_watcher就会调用回调函数并自动传入变换的目录resp。brpc模块brpc中Channel的封装namespace bite_im { //1. 封装单个服务的信道管理类: class ServiceChannel { public: using ptr std::shared_ptrServiceChannel; //brpc::Channel using ChannelPtr std::shared_ptrbrpc::Channel; ServiceChannel(const std::string name): _service_name(name), _index(0){} //服务上线了一个节点则调用append新增信道 void append(const std::string host) { //创建一个 brpc 通信通道, 不需要任何参数的初始化 auto channel std::make_sharedbrpc::Channel(); //连接选项 brpc::ChannelOptions options; //连接超时-1 表示无限等待 options.connect_timeout_ms -1; //RPC 超时-1 表示无限等待 options.timeout_ms -1; //最大重试次数3次 options.max_retry 3; // 使用百度标准协议 options.protocol baidu_std; //根据 服务的host如 192.168.1.100:9000初始化连接 int ret channel-Init(host.c_str(), options); if (ret -1) { LOG_ERROR(初始化{}-{}信道失败!, _service_name, host); return; } std::unique_lockstd::mutex lock(_mutex); //建立通道与主机地址的映射关系显示主机已存在信道 _hosts.insert(std::make_pair(host, channel)); //添加信道 _channels.push_back(channel); } //服务下线了一个节点则调用remove释放信道 void remove(const std::string host) { std::unique_lockstd::mutex lock(_mutex); //查找并删除指定主机地址的一个信道的标识 auto it _hosts.find(host); if (it _hosts.end()) { LOG_WARN({}-{}节点删除信道时没有找到信道信息, _service_name, host); return; } //删除所有信道 for (auto vit _channels.begin(); vit ! _channels.end(); vit) { if (*vit it-second) { _channels.erase(vit); break; } } _hosts.erase(it); } //通过RR轮转策略获取一个Channel用于发起对应服务的Rpc调用 ChannelPtr choose() { std::unique_lockstd::mutex lock(_mutex); if (_channels.size() 0) { LOG_ERROR(当前没有能够提供 {} 服务的节点, _service_name); return ChannelPtr(); } int32_t idx _index % _channels.size(); return _channels[idx]; } private: std::mutex _mutex; int32_t _index; //当前轮转下标计数器 std::string _service_name;//服务名称 std::vectorChannelPtr _channels; //当前服务对应的信道集合 std::unordered_mapstd::string, ChannelPtr _hosts; //主机地址与信道映射关系 }; //总体的服务信道管理类 class ServiceManager { public: using ptr std::shared_ptrServiceManager; ServiceManager() {} //用RR轮转获取指定服务的节点信道 ServiceChannel::ChannelPtr choose(const std::string service_name) { std::unique_lockstd::mutex lock(_mutex); //假设原本的服务k值是/server/chat/instance这里信道管理的k值为/server/chat存储的全部都是这种形式 auto sit _services.find(service_name); if (sit _services.end()) { LOG_ERROR(当前没有能够提供 {} 服务的节点, service_name); return ServiceChannel::ChannelPtr(); } //RR轮转 return sit-second-choose(); } //先声明我关注哪些服务的上下线不关心的就不需要管理了 void declared(const std::string service_name) { std::unique_lockstd::mutex lock(_mutex); _follow_services.insert(service_name); } //onServiceOnline服务上线 //服务上线时调用的回调接口将服务节点管理起来 void onServiceOnline(const std::string service_instance, const std::string host) { //生成想要的k值形式 --- /server/chat/instance - /server/chat std::string service_name getServiceName(service_instance); ServiceChannel::ptr service; { std::unique_lockstd::mutex lock(_mutex); auto fit _follow_services.find(service_name); if (fit _follow_services.end()) { LOG_DEBUG({}-{} 服务上线了但是当前并不关心, service_name, host); return; } //先获取管理对象没有则创建有则添加节点 auto sit _services.find(service_name); if (sit _services.end()) { service std::make_sharedServiceChannel(service_name); _services.insert(std::make_pair(service_name, service)); }else { service sit-second; } } if (!service) { LOG_ERROR(新增 {} 服务管理节点失败, service_name); return ; } service-append(host); LOG_DEBUG({}-{} 服务上线新节点进行添加管理, service_name, host); } //onServiceOffline服务下线 //服务下线时调用的回调接口从服务信道管理中删除指定节点信道 void onServiceOffline(const std::string service_instance, const std::string host) { std::string service_name getServiceName(service_instance); ServiceChannel::ptr service; { std::unique_lockstd::mutex lock(_mutex); auto fit _follow_services.find(service_name); if (fit _follow_services.end()) { LOG_DEBUG({}-{} 服务下线了但是当前并不关心, service_name, host); return; } //先获取管理对象没有则创建有则添加节点 auto sit _services.find(service_name); if (sit _services.end()) { LOG_WARN(删除{}服务节点时没有找到管理对象, service_name); return; } service sit-second; } //我们仍保留着该节点的信道映射删除了信道不再对外提供服务了 service-remove(host); LOG_DEBUG({}-{} 服务下线节点进行删除管理, service_name, host); } private: //获取服务名称 std::string getServiceName(const std::string service_instance) { auto pos service_instance.find_last_of(/); if (pos std::string::npos) return service_instance; return service_instance.substr(0, pos); } private: std::mutex _mutex; std::unordered_setstd::string _follow_services; //当前关注的服务名称集合 std::unordered_mapstd::string, ServiceChannel::ptr _services; //服务名称对应的信道管理对象 }; }brpc服务器void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads, const string path ./data/) { _rpc_server make_sharedbrpc::Server(); //添加文件下载服务 FileServiceImpl* speech_service new FileServiceImpl(path); int ret _rpc_server-AddService(speech_service, brpc::SERVER_OWNS_SERVICE); if (ret -1) { LOG(ERROR) Fail to add service\n; abort(); } //设置brpc服务端参数 brpc::ServerOptions options; options.idle_timeout_sec timeout; options.num_threads num_threads; ret _rpc_server-Start(port, options); if (ret -1) { LOG(ERROR) Fail to start server\n; abort(); } }void make_reg_object(const string reg_host, const string service_name, const string access_host) { _reg_client make_sharedRegistry(reg_host); //注册服务 _reg_client-registry(service_name, access_host); }void start() { _rpc_server-RunUntilAskedToQuit(); }在.cc中调用fsb.make_rpc_server(FLAGS_listen_port, FLAGS_rpc_timeout, FLAGS_rpc_threads, FLAGS_storage_path); fsb.make_reg_object(FLAGS_registry_host, FLAGS_base_service FLAGS_instance_name, FLAGS_access_host); auto server fsb.build(); server-start(); return 0;我们用的是建造者模式brpc客户端//构造rpc信道管理对象 auto sm make_sharedbite_im::ServiceManager(); //声明要管理的服务 sm-declared(FLAGS_file_service); auto put_cb bind(bite_im::ServiceManager::onServiceOnline, sm.get(), placeholders::_1, placeholders::_2); auto del_cb bind(bite_im::ServiceManager::onServiceOffline, sm.get(), placeholders::_1, placeholders::_2); //构造服务发现对象 bite_im::Discovery::ptr dclient make_sharedbite_im::Discovery(FLAGS_etcd_host, FLAGS_base_service, put_cb, del_cb); //通过rpc信道管理对象获取信道 channel sm-choose(FLAGS_file_service); if (!channel) { this_thread::sleep_for(chrono::seconds(1)); LOG_ERROR(获取{}服务的rpc信道失败, FLAGS_file_service); return -1; }TEST(get_test, multi_file) { //发起rpc调用进行文件下载 bite_im::FileService_Stub stub(channel.get()); bite_im::GetMultiFileReq req; bite_im::GetMultiFileRsp* rsp new bite_im::GetMultiFileRsp(); req.set_request_id(4444); req.add_file_id_list(multi_file_id[0]); req.add_file_id_list(multi_file_id[1]); brpc::Controller* cntl new brpc::Controller(); stub.GetMultiFile(cntl, req, rsp, nullptr); ASSERT_FALSE(cntl-Failed()); ASSERT_TRUE(rsp-success()); //将文件数据存储在文件中, //检测返回值下载是否成功 ASSERT_TRUE(rsp-file_data().find(multi_file_id[0]) ! rsp-file_data().end()); ASSERT_TRUE(rsp-file_data().find(multi_file_id[1]) ! rsp-file_data().end()); //写入 bite_im::writeFile(multi_download_file1, rsp-file_data().at(multi_file_id[0]).file_content()); bite_im::writeFile(multi_download_file2, rsp-file_data().at(multi_file_id[1]).file_content()); }etcdbrpc客户端和服务器的协作服务器创建brpc服务对象并实例化文件下载服务的实现类将服务注册到服务器中。服务器配置线程数和超时参数在指定端口上启动监听开始等待客户端连接。服务器创建etcd注册器建立到etcd的连接并创建一个3秒租约的保活对象。然后服务器向etcd写入自己的服务地址这个写入操作会绑定租约ID。写入成功后etcd就开始自动续约只要服务器进程存活租约就永不过期服务地址就一直存在于etcd中。至此服务器已经处于运行状态既在指定端口监听RPC请求又通过etcd对外公布自己的服务地址。客户端需要调用文件下载服务时首先创建etcd客户端连接到一个或多个etcd服务器。客户端通过服务发现进而调用上线函数生成一个channel进而调用choose得到一个channel如果客户端有负载均衡策略可能会轮询选择或者根据权重选择。我们是RR轮询策略channel初始化成功后Channel就建立了到服务器的网络连接可以随时发送请求。客户端创建服务Stub对象Stub是服务的本地代理将Channel对象传入Stub的构造函数。Stub内部持有了Channel的引用。客户端构造请求参数例如创建FileRequest对象设置要下载的文件名。同时创建FileResponse对象用于接收响应创建Controller对象用于控制调用行为比如设置超时、获取错误信息等。当客户端调用stub的Download方法时真正的远程调用开始了。Stub将请求参数按照protobuf协议序列化为二进制数据然后通过Channel将数据发送给服务器。Channel会封装网络包添加请求ID、方法名、元数据等信息通过socket发送出去。此时客户端的线程会阻塞在Download调用上等待服务器返回结果。服务器一直在指定端口上监听当收到客户端的网络包后服务器的网络线程接收数据进行初步的协议解析识别出这是一个RPC请求。服务器根据请求中的方法名找到对应的服务实现即之前注册的FileServiceImpl实例。服务器在独立的工作线程中调用FileServiceImpl的Download方法并将请求参数反序列化后传入Download方法执行真正的业务逻辑根据传入的文件名在指定目录下打开文件读取文件内容到内存中然后将文件内容设置到FileResponse对象中。如果文件不存在或读取失败则在Controller中设置错误信息。业务逻辑执行完毕后调用done的Run方法。这个Run方法会触发服务器框架将响应对象序列化为二进制数据通过网络发送回客户端。整个过程完成。客户端阻塞的Download调用被唤醒Channel接收到服务器的响应数据后反序列化填充到FileResponse对象中。同时Controller中会记录调用是否成功如果失败则包含错误信息。Download调用返回到客户端业务代码客户端检查Controller是否失败。如果成功就可以从FileResponse对象中获取文件内容和大小等数据完成一次完整的远程文件下载。
etcd和brpc的联合运作在即使通讯系统中的原理
发布时间:2026/5/28 5:26:49
目录etcd模块etcd的封装brpc模块brpc中Channel的封装brpc服务器brpc客户端etcdbrpc客户端和服务器的协作etcd模块由于etcd没有现成的封装好的类太过于麻烦了外面需要自己封装如下图片的功能。在项目中我们不需要构造etcd的服务器仅需要构造服务端进行服务发现和服务注册所以etcd的作用就是进行已知服务的发现和未知服务的注册这里仅仅支持在一个base目录下自定义服务路径的注册所以我们构造服务发现对象时需要指定base目录。etcd的封装namespace bite_im{ //服务注册客户端类 class Registry { public: using ptr std::shared_ptrRegistry; Registry(const std::string host): //通过服务器地址创建etcd服务器 _client(std::make_sharedetcd::Client(host)) , //创建 TTL3秒 的租约并启动自动续约如果进程崩溃租约会在3秒后过期etcd 自动删除注册的服务 _keep_alive(_client-leasekeepalive(3).get()), //从保活对象中获取租约ID _lease_id(_keep_alive-Lease()){} //取消保活连接释放租约资源 ~Registry() { _keep_alive-Cancel(); } bool registry(const std::string key, const std::string val) { //put(key, val, _lease_id)将键值对与租约绑定.get是为了阻塞等待结果 //key注册的服务名val服务地址 //将来可以通过根地址直接进行服务发现 auto resp _client-put(key, val, _lease_id).get(); //判断是否成功 if (resp.is_ok() false) { LOG_ERROR(注册数据失败{}, resp.error_message()); return false; } return true; } private: std::shared_ptretcd::Client _client; //etcd客户端对象不需要服务器 std::shared_ptretcd::KeepAlive _keep_alive; //租约对象用于自动续约 uint64_t _lease_id; //租约ID用于绑定键值对 }; //服务发现客户端类 class Discovery { public: using ptr std::shared_ptrDiscovery; using NotifyCallback std::functionvoid(std::string, std::string); //进行服务发现需要4个参数etcd服务器地址服务注册根目录新增服务回调下线服务回调 //hostetcd服务器地址 //basedir服务注册的根目录用于服务发现 //put_cb新增服务的回调函数参数为服务名和地址 //del_cb下线服务的回调函数参数为服务名和地址 Discovery(const std::string host, const std::string basedir, const NotifyCallback put_cb, const NotifyCallback del_cb): _client(std::make_sharedetcd::Client(host)) , _put_cb(put_cb), _del_cb(del_cb){ //先进行服务发现, 先获取到当前根目录以下子目录的已发现的全部节点 auto resp _client-ls(basedir).get(); if (resp.is_ok() false) { LOG_ERROR(获取服务信息数据失败{}, resp.error_message()); } //遍历全部数据调用上线函数进行处理(服务上线是为了机械能rpc调用主要进行构造channel) int sz resp.keys().size(); for (int i 0; i sz; i) { if (_put_cb) _put_cb(resp.key(i), resp.value(i).as_string()); } //然后进行事件监控监控数据发生的改变并调用回调进行处理 //watch(key, callback, recursive)监控key目录下的数据变化并调用回调函数处理 //basedir服务注册的根目录用于服务发现 //callback事件回调函数参数为etcd::Response对象 //recursive是否递归监控子目录的变化 _watcher std::make_sharedetcd::Watcher(*_client.get(), basedir, std::bind(Discovery::callback, this, std::placeholders::_1), true); } ~Discovery() { //取消事件监控 _watcher-Cancel(); } private: void callback(const etcd::Response resp) { if (resp.is_ok() false) { LOG_ERROR(收到一个错误的事件通知: {}, resp.error_message()); return; } //用于实时响应服务注册和下线 for (auto const ev : resp.events()) { //这个服务的类型是PUT就表示需要上线调用上线函数添加或者保存管道信息 if (ev.event_type() etcd::Event::EventType::PUT) { //ev.kv().key() // 键名如 /services/user-service/10.0.0.1:9000 //ev.kv().as_string() // 值如 10.0.0.1:9000 if (_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string()); LOG_DEBUG(新增服务{}-{}, ev.kv().key(), ev.kv().as_string()); //这个服务的类型是DELETE_就表示节点已经被删除需要下线, 调用下线函数删除节点管道信息 }else if (ev.event_type() etcd::Event::EventType::DELETE_) { //ev.prev_kv().key() // 被删除的键名 //ev.prev_kv().as_string() // 被删除的值 if (_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string()); LOG_DEBUG(下线服务{}-{}, ev.prev_kv().key(), ev.prev_kv().as_string()); } } } private: NotifyCallback _put_cb; //上线服务的回调函数 NotifyCallback _del_cb; //下线服务的回调函数 std::shared_ptretcd::Client _client; //etcd客户端对象不需要服务器 std::shared_ptretcd::Watcher _watcher; //事件监控对象 }; }只要感知到变换_watcher就会调用回调函数并自动传入变换的目录resp。brpc模块brpc中Channel的封装namespace bite_im { //1. 封装单个服务的信道管理类: class ServiceChannel { public: using ptr std::shared_ptrServiceChannel; //brpc::Channel using ChannelPtr std::shared_ptrbrpc::Channel; ServiceChannel(const std::string name): _service_name(name), _index(0){} //服务上线了一个节点则调用append新增信道 void append(const std::string host) { //创建一个 brpc 通信通道, 不需要任何参数的初始化 auto channel std::make_sharedbrpc::Channel(); //连接选项 brpc::ChannelOptions options; //连接超时-1 表示无限等待 options.connect_timeout_ms -1; //RPC 超时-1 表示无限等待 options.timeout_ms -1; //最大重试次数3次 options.max_retry 3; // 使用百度标准协议 options.protocol baidu_std; //根据 服务的host如 192.168.1.100:9000初始化连接 int ret channel-Init(host.c_str(), options); if (ret -1) { LOG_ERROR(初始化{}-{}信道失败!, _service_name, host); return; } std::unique_lockstd::mutex lock(_mutex); //建立通道与主机地址的映射关系显示主机已存在信道 _hosts.insert(std::make_pair(host, channel)); //添加信道 _channels.push_back(channel); } //服务下线了一个节点则调用remove释放信道 void remove(const std::string host) { std::unique_lockstd::mutex lock(_mutex); //查找并删除指定主机地址的一个信道的标识 auto it _hosts.find(host); if (it _hosts.end()) { LOG_WARN({}-{}节点删除信道时没有找到信道信息, _service_name, host); return; } //删除所有信道 for (auto vit _channels.begin(); vit ! _channels.end(); vit) { if (*vit it-second) { _channels.erase(vit); break; } } _hosts.erase(it); } //通过RR轮转策略获取一个Channel用于发起对应服务的Rpc调用 ChannelPtr choose() { std::unique_lockstd::mutex lock(_mutex); if (_channels.size() 0) { LOG_ERROR(当前没有能够提供 {} 服务的节点, _service_name); return ChannelPtr(); } int32_t idx _index % _channels.size(); return _channels[idx]; } private: std::mutex _mutex; int32_t _index; //当前轮转下标计数器 std::string _service_name;//服务名称 std::vectorChannelPtr _channels; //当前服务对应的信道集合 std::unordered_mapstd::string, ChannelPtr _hosts; //主机地址与信道映射关系 }; //总体的服务信道管理类 class ServiceManager { public: using ptr std::shared_ptrServiceManager; ServiceManager() {} //用RR轮转获取指定服务的节点信道 ServiceChannel::ChannelPtr choose(const std::string service_name) { std::unique_lockstd::mutex lock(_mutex); //假设原本的服务k值是/server/chat/instance这里信道管理的k值为/server/chat存储的全部都是这种形式 auto sit _services.find(service_name); if (sit _services.end()) { LOG_ERROR(当前没有能够提供 {} 服务的节点, service_name); return ServiceChannel::ChannelPtr(); } //RR轮转 return sit-second-choose(); } //先声明我关注哪些服务的上下线不关心的就不需要管理了 void declared(const std::string service_name) { std::unique_lockstd::mutex lock(_mutex); _follow_services.insert(service_name); } //onServiceOnline服务上线 //服务上线时调用的回调接口将服务节点管理起来 void onServiceOnline(const std::string service_instance, const std::string host) { //生成想要的k值形式 --- /server/chat/instance - /server/chat std::string service_name getServiceName(service_instance); ServiceChannel::ptr service; { std::unique_lockstd::mutex lock(_mutex); auto fit _follow_services.find(service_name); if (fit _follow_services.end()) { LOG_DEBUG({}-{} 服务上线了但是当前并不关心, service_name, host); return; } //先获取管理对象没有则创建有则添加节点 auto sit _services.find(service_name); if (sit _services.end()) { service std::make_sharedServiceChannel(service_name); _services.insert(std::make_pair(service_name, service)); }else { service sit-second; } } if (!service) { LOG_ERROR(新增 {} 服务管理节点失败, service_name); return ; } service-append(host); LOG_DEBUG({}-{} 服务上线新节点进行添加管理, service_name, host); } //onServiceOffline服务下线 //服务下线时调用的回调接口从服务信道管理中删除指定节点信道 void onServiceOffline(const std::string service_instance, const std::string host) { std::string service_name getServiceName(service_instance); ServiceChannel::ptr service; { std::unique_lockstd::mutex lock(_mutex); auto fit _follow_services.find(service_name); if (fit _follow_services.end()) { LOG_DEBUG({}-{} 服务下线了但是当前并不关心, service_name, host); return; } //先获取管理对象没有则创建有则添加节点 auto sit _services.find(service_name); if (sit _services.end()) { LOG_WARN(删除{}服务节点时没有找到管理对象, service_name); return; } service sit-second; } //我们仍保留着该节点的信道映射删除了信道不再对外提供服务了 service-remove(host); LOG_DEBUG({}-{} 服务下线节点进行删除管理, service_name, host); } private: //获取服务名称 std::string getServiceName(const std::string service_instance) { auto pos service_instance.find_last_of(/); if (pos std::string::npos) return service_instance; return service_instance.substr(0, pos); } private: std::mutex _mutex; std::unordered_setstd::string _follow_services; //当前关注的服务名称集合 std::unordered_mapstd::string, ServiceChannel::ptr _services; //服务名称对应的信道管理对象 }; }brpc服务器void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads, const string path ./data/) { _rpc_server make_sharedbrpc::Server(); //添加文件下载服务 FileServiceImpl* speech_service new FileServiceImpl(path); int ret _rpc_server-AddService(speech_service, brpc::SERVER_OWNS_SERVICE); if (ret -1) { LOG(ERROR) Fail to add service\n; abort(); } //设置brpc服务端参数 brpc::ServerOptions options; options.idle_timeout_sec timeout; options.num_threads num_threads; ret _rpc_server-Start(port, options); if (ret -1) { LOG(ERROR) Fail to start server\n; abort(); } }void make_reg_object(const string reg_host, const string service_name, const string access_host) { _reg_client make_sharedRegistry(reg_host); //注册服务 _reg_client-registry(service_name, access_host); }void start() { _rpc_server-RunUntilAskedToQuit(); }在.cc中调用fsb.make_rpc_server(FLAGS_listen_port, FLAGS_rpc_timeout, FLAGS_rpc_threads, FLAGS_storage_path); fsb.make_reg_object(FLAGS_registry_host, FLAGS_base_service FLAGS_instance_name, FLAGS_access_host); auto server fsb.build(); server-start(); return 0;我们用的是建造者模式brpc客户端//构造rpc信道管理对象 auto sm make_sharedbite_im::ServiceManager(); //声明要管理的服务 sm-declared(FLAGS_file_service); auto put_cb bind(bite_im::ServiceManager::onServiceOnline, sm.get(), placeholders::_1, placeholders::_2); auto del_cb bind(bite_im::ServiceManager::onServiceOffline, sm.get(), placeholders::_1, placeholders::_2); //构造服务发现对象 bite_im::Discovery::ptr dclient make_sharedbite_im::Discovery(FLAGS_etcd_host, FLAGS_base_service, put_cb, del_cb); //通过rpc信道管理对象获取信道 channel sm-choose(FLAGS_file_service); if (!channel) { this_thread::sleep_for(chrono::seconds(1)); LOG_ERROR(获取{}服务的rpc信道失败, FLAGS_file_service); return -1; }TEST(get_test, multi_file) { //发起rpc调用进行文件下载 bite_im::FileService_Stub stub(channel.get()); bite_im::GetMultiFileReq req; bite_im::GetMultiFileRsp* rsp new bite_im::GetMultiFileRsp(); req.set_request_id(4444); req.add_file_id_list(multi_file_id[0]); req.add_file_id_list(multi_file_id[1]); brpc::Controller* cntl new brpc::Controller(); stub.GetMultiFile(cntl, req, rsp, nullptr); ASSERT_FALSE(cntl-Failed()); ASSERT_TRUE(rsp-success()); //将文件数据存储在文件中, //检测返回值下载是否成功 ASSERT_TRUE(rsp-file_data().find(multi_file_id[0]) ! rsp-file_data().end()); ASSERT_TRUE(rsp-file_data().find(multi_file_id[1]) ! rsp-file_data().end()); //写入 bite_im::writeFile(multi_download_file1, rsp-file_data().at(multi_file_id[0]).file_content()); bite_im::writeFile(multi_download_file2, rsp-file_data().at(multi_file_id[1]).file_content()); }etcdbrpc客户端和服务器的协作服务器创建brpc服务对象并实例化文件下载服务的实现类将服务注册到服务器中。服务器配置线程数和超时参数在指定端口上启动监听开始等待客户端连接。服务器创建etcd注册器建立到etcd的连接并创建一个3秒租约的保活对象。然后服务器向etcd写入自己的服务地址这个写入操作会绑定租约ID。写入成功后etcd就开始自动续约只要服务器进程存活租约就永不过期服务地址就一直存在于etcd中。至此服务器已经处于运行状态既在指定端口监听RPC请求又通过etcd对外公布自己的服务地址。客户端需要调用文件下载服务时首先创建etcd客户端连接到一个或多个etcd服务器。客户端通过服务发现进而调用上线函数生成一个channel进而调用choose得到一个channel如果客户端有负载均衡策略可能会轮询选择或者根据权重选择。我们是RR轮询策略channel初始化成功后Channel就建立了到服务器的网络连接可以随时发送请求。客户端创建服务Stub对象Stub是服务的本地代理将Channel对象传入Stub的构造函数。Stub内部持有了Channel的引用。客户端构造请求参数例如创建FileRequest对象设置要下载的文件名。同时创建FileResponse对象用于接收响应创建Controller对象用于控制调用行为比如设置超时、获取错误信息等。当客户端调用stub的Download方法时真正的远程调用开始了。Stub将请求参数按照protobuf协议序列化为二进制数据然后通过Channel将数据发送给服务器。Channel会封装网络包添加请求ID、方法名、元数据等信息通过socket发送出去。此时客户端的线程会阻塞在Download调用上等待服务器返回结果。服务器一直在指定端口上监听当收到客户端的网络包后服务器的网络线程接收数据进行初步的协议解析识别出这是一个RPC请求。服务器根据请求中的方法名找到对应的服务实现即之前注册的FileServiceImpl实例。服务器在独立的工作线程中调用FileServiceImpl的Download方法并将请求参数反序列化后传入Download方法执行真正的业务逻辑根据传入的文件名在指定目录下打开文件读取文件内容到内存中然后将文件内容设置到FileResponse对象中。如果文件不存在或读取失败则在Controller中设置错误信息。业务逻辑执行完毕后调用done的Run方法。这个Run方法会触发服务器框架将响应对象序列化为二进制数据通过网络发送回客户端。整个过程完成。客户端阻塞的Download调用被唤醒Channel接收到服务器的响应数据后反序列化填充到FileResponse对象中。同时Controller中会记录调用是否成功如果失败则包含错误信息。Download调用返回到客户端业务代码客户端检查Controller是否失败。如果成功就可以从FileResponse对象中获取文件内容和大小等数据完成一次完整的远程文件下载。