仿 muduo库 从零实现高并发TCP服务框架 (三)
仿 muduo库 从零实现高并发TCP服务框架-CSDN博客https://blog.csdn.net/2502_91433987/article/details/161161792?sharetypeblogdetailsharerId161161792sharereferPCsharesource2502_91433987spm1011.2480.3001.8118仿 muduo库 从零实现高并发TCP服务框架 二-CSDN博客https://blog.csdn.net/2502_91433987/article/details/161198186?sharetypeblogdetailsharerId161198186sharereferPCsharesource2502_91433987spm1011.2480.3001.8118附: 项目代码链接:https://gitee.com/waspqj/high-concurrency_serverhttps://gitee.com/waspqj/high-concurrency_server前面两章已经将功能性模块悉数完成还有EventLoop、LoopThread、loopThreadPool三个核心模块也搞定了最重要的是在这些模块的完成过程中我们理解并实践了高并发和多线程的思想。这章我们来实现TCP的连接管理重点放在连接的安全问题上……5. 模块实现5.8 Connection模块连接管理模块要管理套接字、用户级的缓冲区、请求上下文等等好些东西容我娓娓道来。有什么必须先简单解释一下TCP的面向字节流的特性一次传输不是以包为单位而是一整坨给你比如说上层是http服务一次传输你接受到的可能是多次请求也有可能是半次请求。我们接收就只能把每次接收到的拼接到一起放到缓冲区里通过协议将每次请求提取出来处理。同样的处理好要发送的数据也是一股脑放到发送缓冲区面向字节流的发出去……有了前置知识我们就可以来盘点一下一个连接需要有什么1. _con_id连接id唯一标识也作为timerid)、2. _sockfd _socket套接字、3. _statu状态机关键设计4. _context请求的上下文你接收数据的时候可能会收到或者留下不完整的请求上下文保存不完整的请求等后续拼接上5. _loop关联的IO线程、6. _channel事件管理、7. _buff_out_buff_in用户级发送和接收缓冲区、8. _silence_connect静默链接就是要不要打开超时断连机制 取了一个好听的名字9. 五大回调函数和Channel的事件回调区分开这里的回调是业务处理和上层管理的逻辑事件回调回调的就是Connection模块里的成员函数事件回调肯定要进行业务处理所以事件回调里肯定是要调用Conneciton的回调的// 处理连接模块 class Connection; typedef enum {DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING} CONNSTATU; using ConnectionPtr std::shared_ptrConnection; //模版类: 支持 获取封装this指针的shared_ptr class Connection : public std::enable_shared_from_thisConnection { private: uint64_t _con_id; // 连接的唯一标识id --- 也可以作为timer_id简化操作 int _sockfd; // 用于连接的文件描述符 CONNSTATU _statu; // 连接状态 bool _silence_connect; // 是否支持静默连接 --- 是否超时断连 std::any _context; // 请求的接收处理上下文 Socket _socket; // 套接字操作管理 EventLoop *_loop; // 关联的eventloop Channel _channel; // 事件管理模块 Buffer _buff_out; // 用户级输出缓冲区 Buffer _buff_in; // 用户级输入缓冲区 // 回调函数 --- 为上层使用设计,由服务器模块设置 using ConnectedCallback std::functionvoid(const ConnectionPtr ); using MessageCallback std::functionvoid(const ConnectionPtr , Buffer *); using CloseCallback std::functionvoid(const ConnectionPtr ); using AnyEventCallback std::functionvoid(const ConnectionPtr ); ConnectedCallback _connected_callback; // 连接成功调用的回调 MessageCallback _message_callback; // 接受到完整消息的处理回调 CloseCallback _close_callback; // 关闭连接执行的回调 AnyEventCallback _event_callback; // 事件触发默认回调 // 上层删除管理连接的方法 CloseCallback _server_close_callback; // 同样是关闭连接时执行 };干什么1. 给Channel提供回调函数就是事件触发之后要干什么1读触发说明系统缓冲区收到数据那么我读取数据用处理数据2写触发说明系统写缓冲区没满也说明用户发送缓冲区有数据每当向用户发送缓冲区写入数据就设置写监视3错误处理关闭处理处理接收缓冲区剩余数据关闭连接4任意事件处理如果设置了超时断连那么刷新定时器调用默认回调读写触发里的错误关闭逻辑和关闭逻辑后面会具体说// Channel的5个回调函数 void ReadHandler() { //1.读取数据到缓冲区 char buff[65536]; //64kb ssize_t ret _socket.Recv(buff, 65535); if(ret 0) // 出错了 return ShutdownInLoop(); // 检查缓冲区后关闭连接 // 读到的数据写入buff_in _buff_in.WriteAndPush(buff, ret); //2.主动调用回调,处理业务 // 如果输入缓冲区有数据,调用message回调 if(_buff_in.ReadableLen()) _message_callback(shared_from_this(), _buff_in); } void WriteHandler() { // 将buffout中数据手动发送 ssize_t ret _socket.Send(_buff_out.GetReadPos(), _buff_out.ReadableLen()); if(ret 0) // send出错了 { // 手动检测buff_in, 有数据则将数据先处理 if(_buff_in.ReadableLen()) _message_callback(shared_from_this(), _buff_in); // 真的关闭连接 -- 不能调shutdown套娃 return Release(); // return ReleaseInLoop(); } // 手动修正buffout的readpos --- 手动操作不能忘 _buff_out.MoveReadPos(ret); // 读完了,设置无需监控读,防止老被调,浪费 if(_buff_out.ReadableLen() 0) _channel.UnsetWrite(); // 当执行过Shutdown(半连接状态),还要最后把发送缓冲区的数据发出去, 直到: 1.发送出错 2.发送完了(这里) if(_statu DISCONNECTING) return Release(); // ReleaseInLoop(); 调用栈帧太深了; 还要考虑调用的顺序性,如果前面有读写操作正常压入任务池-先调用release关闭了fd..... } void CloseHandler() { // 如果有数据待处理,那先处理 if(_buff_in.ReadableLen() 0) _message_callback(shared_from_this(), _buff_in); return Release(); } void ErrorHandler() {CloseHandler(); } void EventHandler() { if(!_silence_connect) _loop-RefreshTimer(_con_id); //yeah, refresh here if(_event_callback) _event_callback(shared_from_this()); }2. 定时任务刷新和取消的接口。静默连接就是允许他一直连着不发请求静默嘛这里又出现这样的”InLoop“的接口设计前面的时间轮的三个接口也是如此。为什么这样设计来看对应的正常版本他们没有别的什么逻辑就干一件事RunInLoop把InLoop版本的接口用bind打包成任务发给EventLoop…… 为什么这样做还记得One Thread One Loop设计吗我说过它的本质是让一个线程独享一些资源也就是减少临时资源。每一个未知线程对连接的修改操作都是在对资源的所有权发起挑衅这是不可容忍的。所以不论刷新定时器还是删除定时任务我们都给他过一遍筛RunInLoop。class Connection : public std::enable_shared_from_thisConnection { void SilenceConnectInLoop() { _silence_connect true; if(_loop-HasTimer(_con_id)) _loop-CancelTimer(_con_id); } void RefuseSilenceConnectInLoop(int delay) { _silence_connect false; if(_loop-HasTimer(_con_id)) return _loop-RefreshTimer(_con_id); //若添加过, 刷新 return _loop-AddTimer(_con_id, delay, std::bind(Connection::Release, this)); //若原来没有添加, 新添一个 } public: void SilenceConnect() { if(_statu DISCONNECTED) return; // 已关闭连接状态,套接字被释放, 甚至连接管理也已经失效 _loop-RunInLoop(std::bind(Connection::SilenceConnectInLoop, this)); } void RefuseSilenceConnect(int delay 8) { if(_statu DISCONNECTED) return; // 已关闭连接状态,套接字被释放, 甚至连接管理也已经失效 _loop-RunInLoop(std::bind(Connection::RefuseSilenceConnectInLoop, this, delay)); } };3. 成员的初始化和设置……class Connection : public std::enable_shared_from_thisConnection { public: Connection(uint64_t id, int fd, EventLoop *loop) : _con_id(id), _sockfd(fd), _statu(CONNECTING), _silence_connect(true) , _socket(_sockfd), _loop(loop), _channel(_loop, _sockfd) { // 设置非阻塞 _socket.SetNonBlock(); // //设置channel回调 _channel.SetReadCall(std::bind(Connection::ReadHandler, this)); _channel.SetWriteCall(std::bind(Connection::WriteHandler, this)); _channel.SetCloseCall(std::bind(Connection::CloseHandler, this)); _channel.SetErrorCall(std::bind(Connection::ErrorHandler, this)); _channel.SetEventCall(std::bind(Connection::EventHandler, this)); } ~Connection() {LOG(DEBUG) connection closed: (%p, this) ;} int _Fd() {return _sockfd; } int _Id() {return _con_id; } void SetContext(const std::any context) {_context context; } std::any *GetContext() {return _context; } void SetConnectedCall(const ConnectedCallback cb) {_connected_callback cb; } void SetMessageCall(const MessageCallback cb) {_message_callback cb; } void SetCloseCall(const CloseCallback cb) {_close_callback cb; } void SetEventCall(const AnyEventCallback cb) {_event_callback cb; } void SetSrvCloseCall(const CloseCallback cb) {_server_close_callback cb; } };4. 开启连接工作状态关闭连接以及发送数据功能。必须先说一下状态机的设计了有四个状态DISCONNECTED已断开连接, CONNECTING连接中, CONNECTED已连接, DISCONNECTING断开连接中。收到新连接构造连接管理对象时是CONNECTING真正开启工作设置读监听时是CONNECTED触发错误或者主动断开连接先处理剩余数据时是DISCONNECTING真正关闭连接这个对象之后就要析构了时是DISCONNECTED。状态机是一个标记特别是断连是特别有用因为断连中你还要处理剩余数据在处理数据的逻辑里你必须要通过标记来决定到底做什么……后面我们来详细介绍关闭连接的流程。发送数据这块肯定是必备的message回调处理好的数据肯定要发送给用户的发送缓冲区再开启写监听把数据发给客户端。class Connection : public std::enable_shared_from_thisConnection { // 连接建立之后,设置状态, 真正的开始工作 void EstablishedInLoop() { //1.设置已连接状态 if(_statu CONNECTING) _statu CONNECTED; //2.经典开局设置读监听 _channel.SetRead(); //3.调用连接回调 if(_connected_callback) _connected_callback(shared_from_this()); } // 发送数据到客户端 void SendInLoop(Buffer data) { // 写入缓冲区先 _buff_out.WriteAndPush(data); // 打开写监听 if(!_channel.IsWrite()) _channel.SetWrite(); } //真正的关闭连接 void ReleaseInLoop() { _channel.Remove(); // 取消poller的监控 _socket.Close(); // 关闭文件描述符 // 打开静默状态, 就是关掉定时器,防止后面再调用什么相关接口 if(!_silence_connect) SilenceConnectInLoop(); // 调用关闭回调 if(_close_callback) _close_callback(shared_from_this()); // 调用管理端关闭回调 if(_server_close_callback) _server_close_callback(shared_from_this()); // while(1) sleep(1); } // 关闭连接前,处理剩下的数据 void ShutdownInLoop() { // 0.设置半连接状态*--- 让后续write回调时通过状态去调用release _statu DISCONNECTING; // 1.先看看有没有数据需要处理 if(_buff_in.ReadableLen()) _message_callback(shared_from_this(), _buff_in); // 2.有没有数据需要最后发送, 设置写事件监控,自然会调用channel的写事件回调 if(_buff_out.ReadableLen() !_channel.IsWrite()) _channel.SetWrite(); // 3.如果没有数据可以发了, release if(_buff_out.ReadableLen() 0) Release(); // 这里没有成功调用Release也不用担心, 已经设置好了半连接状态, 后续发送完毕就会调用release } // 切换协议 void UpgradeInLoop(const std::any context, const ConnectedCallback connect, const MessageCallback message, const CloseCallback close, const AnyEventCallback event) { _context context; _connected_callback connect; _message_callback message; _close_callback close; _event_callback event; } public: void Established() { if(_statu DISCONNECTED) return; // 已关闭连接状态,套接字被释放, 甚至连接管理也已经失效 _loop-RunInLoop(std::bind(Connection::EstablishedInLoop, this)); } void Send(const char *data, size_t len) { if(_statu DISCONNECTED) return; // 已关闭连接状态,套接字被释放, 甚至连接管理也已经失效 //data可能是临时对象, 转成buffer Buffer buff; buff.WriteAndPush(data, len); // 在bind这样的打包器里用move转换普通对象非常好使---bind里会保存一个左值副本,刚好对应move后的右值引用,移动构造 // LOG(DEBUG) buff.GetReadPos() : data; _loop-RunInLoop(std::bind(Connection::SendInLoop, this, std::move(buff))); } void Release() //真正的关闭连接 { if(_statu DISCONNECTED) return; // 防止重入release // 只要调用了Release并把ReleaseInLoop加入任务队列, 就设置关闭连接状态, 不允许其他回调加入任务队列 (在执行完ReleaseInLoop后,socket已关闭,连接对象已经被销毁) _statu DISCONNECTED; _loop-QueueInLoop(std::bind(Connection::ReleaseInLoop, shared_from_this())); } void Shutdown() { if(_statu DISCONNECTED) return; // 已关闭连接状态,连接管理也已经失效 _loop-RunInLoop(std::bind(Connection::ShutdownInLoop, this)); } // 切换协议 void Upgrade(const std::any context, const ConnectedCallback connect, const MessageCallback message, const CloseCallback close, const AnyEventCallback event) { if(_statu DISCONNECTED) return; // 已关闭连接状态,套接字被释放, 甚至连接管理也已经失效 _loop-QueueInLoop(std::bind(Connection::UpgradeInLoop, this, context, connect, message, close, event)); } // 考虑到协议切换的顺序性, 强制插入任务队列 // {_loop-RunInLoop(std::bind(Connection::UpgradeInLoop, this, context, connect, message, close, event)); } };关闭流程回头再看读写回调来梳理一下关闭流程1. 读出错直接调用ShutDownInLoop进入处理剩余数据逻辑处理接收缓冲区剩余数据如果发送缓冲区有数据设置写监听在写回调逻辑的最后当状态设置为半断连时直接Release只发送完这最后一次就行了如果发送缓冲区数据没有数据直接Release了。2. 写错误可不能再调用ShutDownInLoop这里又会跑回来写又错误不就死循环了所以我们直接显示处理剩余数据然后Release。3.直接调用关闭已经包含在前面的逻辑里了。void ReadHandler() { //1.读取数据到缓冲区 char buff[65536]; //64kb ssize_t ret _socket.Recv(buff, 65535); if(ret 0) // 出错了 return ShutdownInLoop(); // 检查缓冲区后关闭连接 // 读到的数据写入buff_in _buff_in.WriteAndPush(buff, ret); //2.主动调用回调,处理业务 // 如果输入缓冲区有数据,调用message回调 if(_buff_in.ReadableLen()) _message_callback(shared_from_this(), _buff_in); } void WriteHandler() { // 将buffout中数据手动发送 ssize_t ret _socket.Send(_buff_out.GetReadPos(), _buff_out.ReadableLen()); if(ret 0) // send出错了 { // 手动检测buff_in, 有数据则将数据先处理 if(_buff_in.ReadableLen()) _message_callback(shared_from_this(), _buff_in); // 真的关闭连接 -- 不能调shutdown套娃 return Release(); // return ReleaseInLoop(); } // 手动修正buffout的readpos --- 手动操作不能忘 _buff_out.MoveReadPos(ret); // 读完了,设置无需监控读,防止老被调,浪费 if(_buff_out.ReadableLen() 0) _channel.UnsetWrite(); // 当执行过Shutdown(半连接状态),还要最后把发送缓冲区的数据发出去, 直到: 1.发送出错 2.发送完了(这里) if(_statu DISCONNECTING) return Release(); // ReleaseInLoop(); 调用栈帧太深了; 还要考虑调用的顺序性,如果前面有读写操作正常压入任务池-先调用release关闭了fd..... }看完关闭流程你一定还有一些疑惑比如为什么直接调用ShutDownInLoop而不是Shutdown又为什么要调用Release而不是ReleaseInLoop为什么shutdown的状态码在ShutDownInLoop里面更新而Release却不是以及为什么很多接口都先判断状态为DISCONNECTED就不执行了……这Connecion模块的代码量还有大量的这些细节太恐怖了肝不动了……下章我们将完善整个TCP服务框架并对一些问题和细节进行分析持续关注未完待续……