在基于 Reactor 模型的 C 高性能网络库中应用层缓冲区Buffer与TCP 连接管理TcpConnection是连接建立、数据收发、异常处理的核心组件。一、整体结构与核心定位Buffer TcpConnection 是网络库连接层 IO 层的核心Buffer用户态动态缓冲区解决非阻塞 socket 数据收发的粘包、半包、缓冲区不足问题TcpConnection封装一条 TCP 连接的完整生命周期管理 socket、channel、读写缓冲区、回调、状态二者共同构成网络库的数据通道 连接载体。二、代码精讲第一段buffer.h#pragma once /* 封装实现一个用户态的套接字收发缓冲区用于后边实现的Connection连接对象 */ #include vector #include string #include optional namespace net { class Buffer { public: Buffer(); Buffer(const Buffer other); Buffer(Buffer other); Buffer operator(const Buffer other); Buffer operator(Buffer other); void swap(Buffer other); // 获取可读起始地址 const char *peek()const; // 获取可读数据大小 size_t readableBytes() const; // 可写空间⼤⼩ size_t writableBytes() const; // 向后移动_readerIndex若⼩于可读数据⼤⼩则向后偏移否则就重置读写索引 void retrieve(size_t len); // 数据被全部取出则重置读写索引 void retrieveAll(); // 获取所有数据到string中并偏移读索引 std::string retrieveAllAsString(); // 获取指定⻓度数据到string中并偏移读索引 std::string retrieveAsString(size_t len); // 判断扩容向末尾追加数据偏移写索引 std::copy void append(const void *data, size_t len); // 若可写空间⼤⼩⼩于len则扩容 void ensureWritableBytes(size_t len); // 获取写起始地址 char *beginWrite(); // 偏移写索引 void hasWritten(size_t len); // 分块读取数据到buffer和临时空间中 // 若实际读取⻓度⼩于等于buffer可写空间⼤⼩则偏移写索引即可 // 若实际读取⻓度⼤于buffer可写空间⼤⼩则需要将临时空间的数据追加到buffer中 ssize_t readFd(int fd, int *savedErrno); //追加getline prependableBytes prepend size_t prependableBytes(); void prepend(const void *data,size_t len); //c17特性 optional int size() std::optionalstd::string getline(); private: // 获取空间起始地址 const char *begin()const; char *begin(); // 空间扩容若前置⼤⼩末尾空闲⼤⼩ ⽬标⻓度预留则直接扩容到末尾 // 若前置⼤⼩末尾空闲⼤⼩ ⽬标⻓度预留则将后边的数据拷⻉到前边去 void makeSpace(size_t len); private: const static int kInitialSize 1024; // 默认缓冲区大小 const static int kCheapPrepend 8; // 前置默认预留空间大小 // 使用vector进行空间管理 std::vectorchar _buffer; size_t _reader_idx; // 读位置索引 size_t _writer_idx; // 写位置索引 }; }头文件依赖vector底层动态内存管理string数据返回与格式化optionalC17 安全返回getlinecassert运行时断言校验Buffer 类设计思想应用层非阻塞 socket 收发缓冲区支持自动扩容、数据移动、内存紧凑、前置预留空间面向 TcpConnection 提供统一读写接口核心成员变量std::vectorchar _buffer动态内存容器_reader_idx / _writer_idx读写指针索引kInitialSize默认缓冲区大小1024kCheapPrepend头部预留空间8 字节公开接口设计构造 / 拷贝 / 移动 / 赋值 /swap现代 C 五法则可读 / 可写空间获取数据追加、读取、回收、格式化readFd从 fd 分散读取数据prepend头部插入数据getline按行读取支持 \n私有接口begin()缓冲区起始地址makeSpace()内存扩容与紧凑算法第二段buffer.cc 知识点#include buffer.h #include cassert #includecstring #includesys/uio.h #include iostream namespace net { Buffer::Buffer() : _buffer(kInitialSize), _reader_idx(kCheapPrepend), _writer_idx(kCheapPrepend) { } Buffer::Buffer(const Buffer other) : _buffer(other._buffer), _reader_idx(other._reader_idx), _writer_idx(other._writer_idx) { } Buffer::Buffer(Buffer other) { Buffer tmp; tmp.swap(other); tmp.swap(*this); } Buffer Buffer::operator(const Buffer other) { Buffer tmp(other); tmp.swap(*this); return *this; } Buffer Buffer::operator(Buffer other) { Buffer tmp(std::move(other)); tmp.swap(*this); return*this; } void Buffer::swap(Buffer other) { _buffer.swap(other._buffer); std::swap(_reader_idx, other._reader_idx); std::swap(_writer_idx, other._writer_idx); } // 获取可读起始地址 const char *Buffer::peek() const { return begin() _reader_idx; } // 获取可读数据大小 size_t Buffer::readableBytes() const { return _writer_idx - _reader_idx; } // 可写空间⼤⼩ size_t Buffer::writableBytes() const { return _buffer.size() - _writer_idx; } // 向后移动_readerIndex若⼩于可读数据⼤⼩则向后偏移否则就重置读写索引 void Buffer::retrieve(size_t len) { assert(len readableBytes()); _reader_idx len; } // 数据被全部取出则重置读写索引 void Buffer::retrieveAll() { retrieve(readableBytes()); } // 获取所有数据到string中并偏移读索引 std::string Buffer::retrieveAllAsString() { return retrieveAsString(readableBytes()); } // 获取指定⻓度数据到string中并偏移读索引 std::string Buffer::retrieveAsString(size_t len) { assert(len readableBytes()); std::string retval; // 获取数据到string 中 retval.assign(peek(), len); // 偏移读指针 retrieve(len); return retval; } // 判断扩容向末尾追加数据偏移写索引 std::copy void Buffer::append(const void *data, size_t len) { // 1.确保空闲空间足够不够就需要扩容 ensureWritableBytes(len); // 2.将数据拷贝到缓冲区中 std::copy((const char*)data, (const char*)data len, beginWrite()); // 3.将写指针向后偏移 hasWritten(len); //std::cout[std::string((const char*)data,len)]\n; } // 若可写空间⼤⼩⼩于len则扩容 void Buffer::ensureWritableBytes(size_t len) { // 检查后置空闲空间是否足够够了就返回不够就扩容 if (len writableBytes()) { return; } else { makeSpace(len); } } // 获取写入起始地址 char *Buffer::beginWrite() { return begin() _writer_idx; } // 偏移写索引 void Buffer::hasWritten(size_t len) { assert(len writableBytes()); _writer_idx len; } // 获取空间起始地址 const char *Buffer::begin() const { return _buffer[0]; } char *Buffer::begin() { return _buffer[0]; } size_t Buffer::prependableBytes() { return _reader_idx; } // 空间扩容若前置⼤⼩末尾空闲⼤⼩ ⽬标⻓度(len)预留则直接扩容到末尾 // 若前置⼤⼩末尾空闲⼤⼩ ⽬标⻓度预留则将后边的数据拷⻉到前边去 void Buffer::makeSpace(size_t len) { if (prependableBytes() writableBytes() len kCheapPrepend) { _buffer.resize(_writer_idx len); } else { assert(kCheapPrepend _reader_idx); // 1.获取可读数据大小 size_t dataSize readableBytes(); // 2.将数据拷贝到起始位置前置还是有一个固定的预留空间 std::copy(begin() _reader_idx, begin() _writer_idx, begin() kCheapPrepend); // 3.重置读写指针位置 _reader_idx kCheapPrepend; _writer_idx _reader_idx dataSize; assert(dataSize readableBytes()); } } void Buffer::prepend(const void *data, size_t len){ assert(lenprependableBytes()); //将读指针向前偏移len字节 _reader_idx-len; //拷贝数据到读指针指向位置 std::copy((const char*)data,//要拷贝的目标数据的起始地址 (const char*)datalen,//要拷贝的目标数据的结束地址 begin()_reader_idx); } // c17特性 optional int size() std::optionalstd::string Buffer::getline(){ //memchr 从可读数据中查找\n void* posmemchr((void*)peek(),\n,readableBytes()); if(posNULL) { return std::nullopt; } //构造string对象拷贝数据进去 size_t lineSize(char*)pos-peek()1; //std::string str(peek(),lineSize); //std::cout获取数据[str/\n; //将读指针向后偏移 return retrieveAsString(lineSize); } // 分块读取数据到buffer和临时空间中 // 若实际读取⻓度⼩于等于buffer可写空间⼤⼩则偏移写索引即可 // 若实际读取⻓度⼤于buffer可写空间⼤⼩则需要将临时空间的数据追加到buffer中 // 从描述符中读取数据到缓冲区中 // 缓冲区剩余空间大小是固定的但是fd能读到的数据是不固定要用到的分块读取 ssize_t Buffer::readFd(int fd, int *savedErrno) { //ssize_t readv(int fd, const struct iovec *iov, int iovcnt); //ssize_t read(int fd, void *buf, size_t count); ssize_t wableSizewritableBytes(); char temp[65536];//64k struct iovec vec[2]; vec[0].iov_basebeginWrite();//读取的数据优先写入的位置 vec[0].iov_lenwableSize;//该位置最大写入的数据长度 vec[1].iov_basetemp;//第二个备用空间 vec[1].iov_len65536; //1.判断是否需要使用备用空间 int countwableSize65536?1:2; ssize_t retreadv(fd,vec,count); //返回值是在实际读取到的数据量判断是否实际读取的数据大小小于writableBytes() //ret writableBytes(),代表第二块备用空间没用上偏移指针即可 //retwritableBytes(),代表备用空间中必然有一部分数据ret-writeableBytes() //这种情况就需要将备用空间的数据追加到缓冲区中 if(ret0) { *savedErrnoerrno; } else if(ret wableSize) { hasWritten(ret);//将写指针向后偏移 }else{ hasWritten(wableSize);//将写指针直接偏移到末尾 append(temp,ret-wableSize); } return ret; } }现代 C 内存管理拷贝构造、移动构造、拷贝赋值、移动赋值swap高效交换资源vector 自动内存管理缓冲区核心算法双指针读写模型读指针、写指针分离内存紧凑机制空间不足时将数据前移减少扩容自动扩容策略不够则扩容够则紧凑前置预留空间支持头部快速插入数据Linux 高性能 IOreadv分散读一次读取两块不连续内存64KB 临时栈缓冲区避免内存浪费错误码保存机制字符串与数据处理memchr查找换行符std::copy安全内存拷贝assignstring 安全构造retrieve读指针后移自动回收空间C17 特性std::optional安全返回避免空字符串歧义移动语义优化性能工程健壮性大量assert保证逻辑合法性边界条件全覆盖自动内存整理无内存泄漏第三段connection.h 知识点#pragma once #include channel.h #include details.h #include socket.h #include buffer.h #include any #include cassert namespace net { class TcpConnection; using TcpConnectionPtr std::shared_ptrTcpConnection; using ConnectionCallback std::functionvoid(TcpConnectionPtr); using MessageCallback std::functionvoid(TcpConnectionPtr, Buffer *, Timestamp); using CloseCallback std::functionvoid(TcpConnectionPtr); // 让 TcpConnection 这个类具备在成员函数里安全获取自身 shared_ptr // 的能力 防止对象还在使用中就被析构悬空指针 class TcpConnection : public std::enable_shared_from_thisTcpConnection { enum State { kDisconnected, kConnecting, kConnected, kDisconnecting }; public: // 初始化成员设置channel回调设置连接保活 TcpConnection(EventLoop *loop, int fd, int id) : _id(id), _state(kConnecting), _loop(loop), _socket(new Socket(fd)), _channel(new Channel(loop, fd)) { // 设置套接字选项连接保活 _socket-setKeepAlive(true); // 设置channel回调 _channel-setReadCallback(std::bind(TcpConnection::handleRead, this, std::placeholders::_1)); _channel-setWriteCallback(std::bind(TcpConnection::handleWrite, this)); _channel-setCloseCallback(std::bind(TcpConnection::handleClose, this)); _channel-setErrorCallback(std::bind(TcpConnection::handleError, this)); LOG_DEBUG(construct TcpConnection at:%p - %d, this, _socket-fd()); } // 没有什么要做的 ~TcpConnection() { LOG_DEBUG(distruct TcpConnection at:%p - %d, this, _socket-fd()); assert(_state kDisconnected); } // 成员操作 int id() { return _id; } bool connected() { return _state kConnected; } bool disconnected() { return _state kDisconnected; } EventLoop *loop() { return _loop; } Buffer *inputBuffer() { return _inputBuffer; } Buffer *outputBuffer() { return _outputBuffer; } void setContext(std::any context) { _context std::move(context); } const std::any context() const { return _context; } std::any mutableContext() { return _context; } void setConnectionCallback(ConnectionCallback cb) { _onConnection std::move(cb); } void setMessageCallback(MessageCallback cb) { _onMessage std::move(cb); } void setCloseCallback(CloseCallback cb) { _onClose std::move(cb); } void send(const void *data, size_t len); void forceClose(); void connectEstablished(); void connectDestroyed(); private: void sendInLoop(const void *data, size_t len); void sendInLoop(const std::string str); void forceCLoseInLoop(); void handleRead(Timestamp recvTime); void handleWrite(); void handleClose(); void handleError(); private: const int64_t _id; // 标识ID State _state; // 连接状态 EventLoop *_loop; std::unique_ptrSocket _socket; // 描述符对应的套接字对象 std::unique_ptrChannel _channel; // 描述符事件处理对象 Buffer _inputBuffer; // 输入缓冲区内核-》用户 Buffer _outputBuffer; // 输出缓冲区用户-》内核 std::any _context; // 上下文对象 ConnectionCallback _onConnection; // MessageCallback _onMessage; //消息到达回调 CloseCallback _onClose; }; }头文件依赖channel.h事件分发socket.h套接字管理buffer.h读写缓冲区any连接上下文存储memory智能指针functional函数回调回调类型定义ConnectionCallback连接建立 / 关闭回调MessageCallback消息到达回调CloseCallback内部关闭回调TcpConnectionPtrshared_ptrTcpConnectionTcpConnection 核心设计继承enable_shared_from_this安全获取自身智能指针连接状态机kDisconnected/kConnecting/kConnected/kDisconnecting核心成员EventLoop* _loop归属 IO 线程unique_ptrSocket套接字管理unique_ptrChannel事件管理Buffer _inputBuffer / _outputBuffer读写缓冲区std::any _context用户自定义上下文各类回调函数对象公开接口send()发送数据forceClose()强制关闭连接connectEstablished/Destroyed连接建立 / 销毁缓冲区、状态、ID 获取接口第四段connection.cc 知识点#include connection.h #include eventloop.h #include cassert namespace net { void TcpConnection::send(const void *data, size_t len){ //若发送数据的时候就在loop线程中则直接发送否则将send任务压入到loop任务池中 if(_loop-isInLoopThread()) { sendInLoop(data,len); }else{ std::string str((char*)data,len); void(TcpConnection::*fp)(const std::string)TcpConnection::sendInLoop; _loop-runInLoop(std::bind(fp,this,str)); } } void TcpConnection::sendInLoop(const void *data, size_t len){ _loop-assertInLoopThread(); //1.若当前没有进行描述符的写事件监控且发送缓冲区中没有数据 // 则直接write/send数据 否则将数据放入到发送缓冲区中 //2.若直接发送数据数据没有发送完就要把剩余数据放入到自己的发送缓冲区开启写事件监控 bool errFlagfalse;//错误标志用于保存write是否出错了 int32_t leftSizelen; int32_t nwrote0; if(!_channel-isWriting()_outputBuffer.readableBytes()0) { nwrotesockets::write(_socket-fd(),data,len); if(nwrote0) { leftSize-nwrote; }else{ LOG_ERROR(写入数据失败:%d,_socket-fd()); errFlagtrue; } } if(leftSize0errFlagfalse){ //将剩余数据放入缓冲区 _outputBuffer.append((const char*)datanwrote,leftSize); if(!_channel-isWriting()){ _channel-enableReading();//开启写事件jiankong } } } void TcpConnection::sendInLoop(const std::string str){ sendInLoop((const void*)str.c_str(),str.size()); } void TcpConnection::forceClose(){ //连接不管是不是在线程中都不能直接关闭必须压任务到loop任务池中进行 //主要是为了避免任务池中有send任务 _statekDisconnecting;//连接状态设置为关闭 _loop-queueInLoop(std::bind(TcpConnection::forceCLoseInLoop,shared_from_this())); } void TcpConnection::forceCLoseInLoop(){ _loop-assertInLoopThread(); //设置状态为kdisConnected解除事件监控移除监控管理调用回调函数... if(_statekConnected||_statekDisconnecting){ handleClose(); } //kDisconnecting状态说明这是调用forceClose的时候设置的 //kConnected状态说明整个服务端释放资源的时候会调用connectionCallback // 内部可能用户调用conn-forceClose() } //这是实际的释放资源回调 void TcpConnection::handleClose(){ //设置状态为kDisconnected解除事件监控调用回调函数 _loop-assertInLoopThread(); //这个断言保证了handleClose永远只会被调用一次 assert(_statekConnected||_statekDisconnecting); _statekDisconnected; _channel-disableAll(); //额外保存一份只能指针对象防止提前释放 TcpConnectionPtr gardThis(shared_from_this()); if(_onConnection) _onConnection(gardThis);//这是用户设置的连接建立/关闭回调函数 if(_onClose) _onClose(gardThis);//这是框架内部的资源释放回调处理函数 } void TcpConnection::connectEstablished(){ //连接对象所有初始化工作都完成后才会被调用的接口而且必须在loop线程中 _loop-assertInLoopThread(); assert(_statekConnecting); _statekConnected; //设置channel中的观察者对象 _channel-tie(shared_from_this());// 绑定自己防止析构 //启动channel的读事件监控 _channel-enableReading();// 开启读事件监听 //调用连接建立回调函数通知用户 if(_onConnection) _onConnection(shared_from_this()); } void TcpConnection::connectDestroyed(){ //这个接口是整个服务器TcpServer析构的时候释放连接对象时调用 _loop-assertInLoopThread(); if(_statekConnected){//连接建立中 _channel-disableAll(); if(_onConnection) _onConnection(shared_from_this()); } _channel-remove();//移除事件监控管理 } void TcpConnection::handleRead(Timestamp recvTime){ _loop-assertInLoopThread(); //将socket内核缓冲区的数据读取到inputbuffer中然后调用mssageCallback int errNum; ssize_t n_inputBuffer.readFd(_socket-fd(),errNum); if(n0){ //连接断开 return handleClose(); }else if(n0){ //读取出错 return handleError(); } if(_onMessage) _onMessage(shared_from_this(),_inputBuffer,recvTime); } void TcpConnection::handleWrite(){//写事件触发的时候调用 _loop-assertInLoopThread(); if(_channel-isWriting()false) { //连接半关闭 LOG_ERROR(CONNECTION FD %d is shutdown write,_socket-fd()); return; } ssize_t nsockets::write(_socket-fd(),_outputBuffer.peek(),_outputBuffer.readableBytes()); if(n0) { LOG_ERROR(发送数据错误); return; } _outputBuffer.retrieve(n);//将发送缓冲区的读指针向后偏移 //需要考虑发送缓冲区中的数据有没有发送完毕发送缓冲区没有数据了关闭写事件监控 if(_outputBuffer.readableBytes()0){ // 发完关闭写监听避免空转 _channel-disableWriting(); } } void TcpConnection::handleError(){ _loop-assertInLoopThread(); LOG_ERROR(连接操作出错); } }Reactor 模型事件驱动Channel 绑定读 / 写 / 关闭 / 错误回调事件触发 → 调用对应 handle 函数完全非阻塞 IO 模型线程安全保证isInLoopThread判断是否在 IO 线程runInLoop/queueInLoop跨线程安全调用所有 IO 操作必须在 IO 线程执行数据发送逻辑send直接发送 缓冲区排队双策略写事件监控按需开启 / 关闭未发完数据存入 outputBuffer避免重复开启写事件导致 CPU 空转读事件处理handleReadreadFd分散读取到 inputBuffer读取长度 0 → 对端关闭 → handleClose读取成功 → 调用 messageCallback写事件处理handleWrite从 outputBuffer 取出数据发送发送完成自动关闭写事件错误处理与日志输出连接关闭与状态机handleClose关闭状态流转forceClose安全强制关闭状态保证只关闭一次不重复析构shared_from_this防止连接提前释放智能指针安全gardThis保护连接生命周期tie(shared_from_this())绑定 Channel 防止悬空工程级鲁棒性大量断言保证状态合法性错误日志清晰资源自动释放上下文任意类型存储套接字选项setKeepAliveTCP 保活三、Buffer 类高性能应用层缓冲区设计思想1. 双指针无拷贝设计Buffer 使用读指针 写指针实现高效数据管理无需频繁移动内存读取只移动指针写入直接追加空间不足自动紧凑或扩容2. 自动内存紧凑核心亮点当尾部空间不足时将有效数据整体前移释放头部空闲空间减少不必要的内存分配大幅提升性能3. readv 分散读Linux 高性能 IO一次 readv 读取两块内存Buffer 可写区域栈上临时 64KB 空间 避免缓冲区不足导致数据丢失。4. 行读取支持getline()使用 memchr 快速查找 \n适合 HTTP、Redis、自定义文本协议。四、TcpConnection 类TCP 连接生命周期管理1. 连接状态机非常关键kConnecting连接中kConnected已连接kDisconnecting关闭中kDisconnected已关闭所有操作严格按状态流转避免逻辑混乱。2. 线程安全所有 IO、事件、关闭操作必须在 IO 线程执行。 跨线程调用通过runInLoop转发无锁、高效、安全。3. 智能指针安全继承enable_shared_from_this确保回调执行时连接不会被释放Channel 不会出现悬空指针资源安全释放4. 双缓冲区设计inputBuffer内核 → 用户outputBuffer用户 → 内核完美解决非阻塞 socket 发送不完整、粘包半包问题。五、整体工作流程最清晰连接建立 → TcpConnection 构造绑定 Channel 回调 → 开启读事件数据到达 → 触发 handleRead → readFd 读入 Buffer → 消息回调发送数据 → 直接写 / 存入 outputBuffer → 开启写事件写事件触发 → 发送缓冲区数据 → 发完关闭写事件对端关闭 / 主动关闭 → 状态机关闭 → 资源释放 → 回调通知六、总结最精华Buffer 核心价值双指针高效管理自动扩容 内存紧凑分散读高性能行读取支持协议解析无拷贝、无泄漏、高稳定TcpConnection 核心价值完整 TCP 连接生命周期管理Reactor 事件驱动线程安全、跨线程安全发送状态机保证健壮智能指针防止悬空双缓冲区解决非阻塞收发问题