上一篇【第16篇】RecordAccumulator源码深度解析——Kafka生产者的消息缓冲区秘密下一篇【第18篇】BufferPool源码解析——Kafka生产者的内存管家摘要消息到了RecordAccumulator最终被打包放在哪里答案是MemoryRecords——一个封装了Java NIO ByteBuffer的数据容器再加上Compressor提供的压缩能力组成了Kafka最底层的消息存储单元。RecordBatch则是在MemoryRecords之上加了一层管理外壳负责追踪批次状态、管理回调队列。本文将从MemoryRecords的ByteBuffer封装说起解析RecordBatch的完整结构梳理消息格式从V0到V2的演进历程最后对比GZIP/Snappy/LZ4/ZSTD四种压缩算法的特性。读完这篇你会真正理解Kafka消息长什么样子。一、MemoryRecords——消息的物理容器1.1 整体架构【MemoryRecords 核心结构】 MemoryRecords ┌─────────────────────────────────────────────┐ │ │ │ buffer: ByteBuffer ←── 消息数据的存储容器 │ │ writeLimit: int ←── buffer最多写入多少 │ │ writable: boolean ←── 是否可写模式 │ │ │ │ compressor: Compressor (可选压缩器) │ │ ┌────────────────────────────────────┐ │ │ │ appendStream: DataOutputStream │ │ │ │ │ (装饰器: 提供压缩功能) │ │ │ │ ▼ │ │ │ │ bufferStream: ByteBufferOutputStream│ │ │ │ │ (装饰器: 提供自动扩容功能) │ │ │ │ ▼ │ │ │ │ buffer: ByteBuffer ←── 最终目的地 │ │ │ └────────────────────────────────────┘ │ └─────────────────────────────────────────────┘1.2 压缩器的装饰器模式这是Kafka中装饰器模式Decorator Pattern的经典应用// Compressor构造时根据压缩类型选择不同的包装层publicCompressor(ByteBufferbuffer,CompressionTypetype){this.typetype;// 第一层ByteBuffer → ByteBufferOutputStream自动扩容bufferStreamnewByteBufferOutputStream(buffer);// 第二层根据压缩类型选择合适的输出流装饰器appendStreamwrapForOutput(bufferStream,type,COMPRESSION_DEFAULT_BUFFER_SIZE);}publicstaticDataOutputStreamwrapForOutput(ByteBufferOutputStreambuffer,CompressionTypetype,intbufferSize){try{switch(type){caseNONE:// 不压缩直接包装DataOutputStreamreturnnewDataOutputStream(buffer);caseGZIP:// GZIP压缩JDK内置直接newreturnnewDataOutputStream(newGZIPOutputStream(buffer,bufferSize));caseSNAPPY:// Snappy压缩非JDK标准库反射创建避免依赖缺失报错OutputStreamstream(OutputStream)snappyOutputStreamSupplier.get().newInstance(buffer,bufferSize);returnnewDataOutputStream(stream);caseLZ4:// LZ4压缩同样使用反射方式创建// ...类似SNAPPY的实现default:thrownewIllegalArgumentException(Unknown compression type: type);}}catch(IOExceptione){thrownewKafkaException(e);}}为什么GZIP直接newSnappy/LZ4却用反射答GZIP的GZIPOutputStream是JDK内置的但Snappy和LZ4需要额外引入JAR包。用反射可以做到——不引入Snappy包时代码照样编译通过只有当实际使用时才加载类。这是可选的依赖的经典处理方式。1.3 MemoryRecords的核心方法publicclassMemoryRecords{// 构造方法是私有的只能通过emptyRecords()创建privateMemoryRecords(ByteBufferbuffer,CompressionTypecompressionType,booleanwritable,intwriteLimit){this.writablewritable;this.writeLimitwriteLimit;// 创建压缩器可写模式下if(this.writablecompressionType!CompressionType.NONE)this.compressornewCompressor(buffer,compressionType);elsethis.bufferbuffer;}// 工厂方法publicstaticMemoryRecordsemptyRecords(ByteBufferbuffer,CompressionTypecompressionType,intwriteLimit){returnnewMemoryRecords(buffer,compressionType,true,writeLimit);}// 追加一条消息publiclongappend(longoffset,longtimestamp,byte[]key,byte[]value){if(!writable)thrownewIllegalStateException(MemoryRecords is not writable);// 写入消息的各个字段通过Compressor → DataOutputStream → ByteBufferintsizeRecord.sizeOf(key,value);compressor.putLong(offset);compressor.putInt(size);// 消息体使用CRC32校验longcrccompressor.putRecord(timestamp,key,value);compressor.recordWritten(sizeRecords.LOG_OVERHEAD);returncrc;}// 估算剩余空间不精确只是估算publicbooleanhasRoomFor(byte[]key,byte[]value){if(!this.writable)returnfalse;// 基于压缩率估算因子进行估算returnthis.compressor.estimateBytesWritten()Records.recordSize(key,value)this.writeLimit;}// 关闭写入转为只读模式publicvoidclose(){if(writable){// 关闭压缩流flush剩余数据if(compressor!null)this.compressor.close();// 切换到压缩后的ByteBuffer// 注意压缩可能导致Buffer扩容buffer和bufferStream.buffer可能不同this.bufferthis.compressor.buffer();this.writablefalse;}}publicintsizeInBytes(){// 如果是可写模式返回实际已写入大小// 如果是只读模式返回buffer的大小if(writable)returnthis.compressor.bufferStream().size();elsereturnthis.buffer.limit();}}二、RecordBatch——消息批次的管理外壳2.1 RecordBatch的核心字段publicfinalclassRecordBatch{publicfinalTopicPartitiontopicPartition;// 目标分区publicfinalProduceRequestResultproduceFuture;// 批次完成标志(CountDownLatch)publicintrecordCount;// 批次内消息数量publicintmaxRecordSize;// 批次内最大消息大小publicvolatileintattempts;// 发送尝试次数privatelonglastAppendTime;// 最后追加时间privatelongdrainedMs;// 被drain的时间privatelonglastAttemptMs;// 最后尝试发送时间publicfinalMemoryRecordsrecords;// 物理消息存储publicfinalListThunkthunks;// 回调队列(每个消息一个Thunk)privatelongoffsetCounter;// 批次内偏移量计数器(从0开始)privatebooleanretry;// 是否正在重试}2.2 Thunk——消息回调的载体// Thunk是RecordBatch的内部类封装了一条消息的元数据和回调finalstaticclassThunk{finalCallbackcallback;// 用户自定义回调finalFutureRecordMetadatafuture;// Future对象(用于同步等待结果)Thunk(Callbackcallback,FutureRecordMetadatafuture){this.callbackcallback;this.futurefuture;}}2.3 tryAppend()——尝试往Batch里追加消息publicFutureRecordMetadatatryAppend(longtimestamp,byte[]key,byte[]value,Callbackcallback,longnow){// 先估算Batch剩余空间够不够放这条消息if(!this.records.hasRoomFor(key,value)){returnnull;// 空间不够返回null让调用者创建新Batch}// 空间够了开始写入longchecksumthis.records.append(offsetCounter,// 批次内偏移量注意不是分区的全局offsettimestamp,key,value);// 更新统计信息maxRecordSizeMath.max(maxRecordSize,Record.recordSize(key,value));lastAppendTimenow;// 创建FutureRecordMetadata用于异步等待结果FutureRecordMetadatafuturenewFutureRecordMetadata(this.produceFuture,// 指向本批次的ProduceRequestResultthis.recordCount,// 消息在批次中的序号timestamp,checksum,keynull?-1:key.length,valuenull?-1:value.length);// 如果用户传了Callback封装成Thunk保存起来if(callback!null)thunks.add(newThunk(callback,future));this.recordCount;returnfuture;}2.4 done()——批次完成后的回调触发publicvoiddone(longbaseOffset,longtimestamp,RuntimeExceptionexception){// 遍历所有thunk逐个触发用户回调for(inti0;ithis.thunks.size();i){try{Thunkthunkthis.thunks.get(i);if(exceptionnull){// 正常完成构造RecordMetadata调用用户callbackRecordMetadatametadatanewRecordMetadata(this.topicPartition,// 分区baseOffset,// 服务端分配的基offsetthunk.future.relativeOffset(),// 消息在Batch内的相对偏移timestamp,thunk.future.checksum(),thunk.future.serializedKeySize(),thunk.future.serializedValueSize());thunk.callback.onCompletion(metadata,null);}else{// 异常完成metadata传nullthunk.callback.onCompletion(null,exception);}}catch(Exceptione){log.error(Error executing user-provided callback,e);}}// 标记批次完成CountDownLatch.countDown()唤醒同步等待的线程this.produceFuture.done(topicPartition,baseOffset,exception);}三、消息格式版本演进——V0→V1→V23.1 演进历程对比【Kafka消息格式版本演进】 V0 (Kafka 0.8.x) V1 (Kafka 0.10.0) V2 (Kafka 0.11.0) ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ CRC32 (4B) │ │ CRC32 (4B) │ │ Length (varint) │ │ Magic0 (1B) │ │ Magic1 (1B) │ │ Attributes(1B) │ │ Attributes(1B) │ │ Attributes(1B) │ │ TimestampDelta │ │ Key Len (4B) │ │ Timestamp (8B) │ ←新增 │ OffsetDelta │ │ Key (变长) │ │ Key Len (4B) │ │ Key Len (varint)│ │ Value Len (4B) │ │ Key (变长) │ │ Key (变长) │ │ Value (变长) │ │ Value Len (4B) │ │ Value Len(varint│ └─────────────────┘ │ Value (变长) │ │ Value (变长) │ └─────────────────┘ │ Headers Count │ Magic字段: 1 │ Headers (变长) │ └─────────────────┘ Magic字段: 23.2 各版本特性对比特性V0 (Magic0)V1 (Magic1)V2 (Magic2)Kafka版本0.8.x0.10.00.11.0时间戳❌ 无✅ 有CreateTime/LogAppendTime✅ 有压缩方式外层压缩外层压缩内层压缩每条消息独立消息头(Headers)❌ 无❌ 无✅ 有key-value扩展ID编码固定4字节固定4字节Varint变长编码省空间CRC校验✅ 整批CRC✅ 整批CRC✅ 逐条CRC在Record层面批量迭代效率低需逐条解析低高RecordBatch可直接跳过多条3.3 V2格式的重大改进Record BatchV2最大的变化是将消息集合从一批Message变成了一个Record Batch——批次级别的元数据独立出来了【V2 Record Batch 结构】 ┌────────────────────────────────────┐ │ Base Offset (8B) │ ← 本批次第一条消息的偏移量 │ Length (4B) │ ← 批次总长度 │ Partition Leader Epoch (4B) │ ← Leader版本号解决HW截断 │ Magic (1B) │ ← 固定为2 │ CRC (4B) │ ← 整个批次的CRC校验 │ Attributes (2B) │ ← 压缩类型、时间戳类型等 │ Last Offset Delta (4B) │ ← 最后一条消息的偏移量增量 │ First Timestamp (8B) │ ← 第一条消息的时间戳 │ Max Timestamp (8B) │ ← 最后一条消息的时间戳 │ Producer ID (8B) │ ← 幂等Producer ID │ Producer Epoch (2B) │ ← Producer Epoch │ Base Sequence (4B) │ ← 第一条消息的序列号 │ Records Count (4B) │ ← 批次内消息数量 ├────────────────────────────────────┤ │ Record 0 │ │ Record 1 │ │ Record 2 │ │ ... │ │ Record N │ └────────────────────────────────────┘ 每条Record内部V2: ┌────────────────────────────────────┐ │ Length (varint) │ ← 变长编码 │ Attributes (1B) │ │ Timestamp Delta (varint) │ ← 相对BaseTimestamp的增量 │ Offset Delta (varint) │ ← 相对BaseOffset的增量 │ Key Length (varint) │ │ Key (变长) │ │ Value Length (varint) │ │ Value (变长) │ │ Headers Count (varint) │ │ Headers (变长, 每个Header有Key-Value) │ └────────────────────────────────────┘Varint变长整数编码是V2省空间的关键——小数字用1字节大数字才用多字节。比如offsetDelta通常很小几条消息的差距用Varint只需要1字节而不是4字节。四、四种压缩算法全面对比4.1 压缩时机【Kafka消息压缩流程】 Producer端: ┌──────────┐ ┌──────────┐ ┌──────────┐ │ 原始消息 │ ──► │ 批量收集 │ ──► │ 整批压缩 │ ──► 发送压缩后的Batch └──────────┘ └──────────┘ └──────────┘ ↑ RecordAccumulator攒消息 (batch.size条/linger.ms时间) Broker端: ┌──────────────┐ │ 原样存储压缩数据│ ←── Broker不解压直接写入磁盘 └──────────────┘ Consumer端: ┌──────────────┐ ┌──────────┐ │ 拉取压缩Batch │ ──► │ 解压还原 │ ──► 返回原始消息给业务代码 └──────────────┘ └──────────┘关键点Broker不解压也不重新压缩消息在Producer端压缩后一路原样传输到Consumer端。这让Broker完全无感知避免了服务器端CPU负担。4.2 四种算法详细对比对比维度GZIPSnappyLZ4ZSTD压缩率最高 (~75%)中等 (~50%)中等 (~50%)极高 (~80%)压缩速度慢快极快中等解压速度中等极快极快快CPU开销高低极低中等JDK内置✅ 内置❌ 需引入❌ 需引入❌ 需引入适用场景带宽紧张低延迟高性能平衡最优Kafka版本所有版本0.8.20.8.22.1.04.3 选型决策建议【压缩算法选择决策树】 你的瓶颈是什么 │ ├── 带宽紧张 ──► 选 GZIP 或 ZSTD压缩率高省带宽 │ ┌─ 能接受更高CPU开销 → GZIP │ └─ CPU也紧张 → ZSTD压缩率更高解压也快 │ ├── CPU紧张 ──► 选 LZ4 或 SnappyCPU开销极低 │ ┌─ Kafka 2.1 → LZ4整体性能最优 │ └─ 老版本Kafka → Snappy │ └── 延迟敏感 ──► 选 LZ4压缩解压最快本篇小结MemoryRecords和RecordBatch是Kafka消息的物理载体和管理外壳MemoryRecords底层用ByteBuffer存储消息数据通过Compressor装饰器链ByteBufferOutputStream → DataOutputStream → 压缩流实现分层处理。hasRoomFor()的估算不是精确值实际写入时可能触发扩容RecordBatch管理一个批次中所有消息的生命周期。tryAppend()追加消息done()触发所有回调。每个消息的Callback通过Thunk对象与FutureRecordMetadata绑定消息格式演进V0→V1→V2的进化方向是省空间Varint变长编码 增加功能时间戳、Headers、幂等支持。V2的Record Batch格式将批次元数据和消息数据分层迭代效率大幅提升压缩策略Producer压缩→Broker原样存储→Consumer解压端到端的压缩。ZSTD综合最优LZ4速度最快GZIP压缩率最高消息怎么存的搞清楚了但ByteBuffer的内存从哪来的下一篇我们看看BufferPool这位内存管家是怎么复用内存、避免GC的。上一篇【第16篇】RecordAccumulator源码深度解析——Kafka生产者的消息缓冲区秘密下一篇【第18篇】BufferPool源码解析——Kafka生产者的内存管家