【Kafka源码解读和使用指南】第77篇:Kafka MirrorMaker2实战——跨集群数据同步从入门到精通
上一篇【第76篇】Kafka与Flink集成实战——大数据实时计算的黄金组合下一篇【第78篇】Kafka生态全景图——与大数据技术栈的完美融合摘要假设你有一个主Kafka集群在上海机房现在要在深圳机房建一个灾备集群——怎么把数据同步过去用MirrorMaker2。MirrorMaker2简称MM2是Kafka 2.4提供的官方跨集群数据同步工具。它彻底革新了MM1的架构基于Kafka Connect框架实现支持双向同步、Topic自动发现、Offset翻译等高级特性。本文带你走通从MM1到MM2的演进之路详解MM2的双向镜像配置、Topic过滤、Offset同步以及生产部署的关键注意事项。一、MirrorMaker的演进——MM1到MM2MM1的问题MirrorMaker1是Kafka最早的数据镜像工具2014年就有了但它的问题太多【MM1的架构缺陷】 源集群(上海) 目标集群(深圳) ┌─────────────┐ ┌─────────────┐ │ Topic A │ consume │ Topic A │ │ Topic B │─────────→│ Topic B │ │ Topic C │ produce │ Topic C │ └─────────────┘ └─────────────┘ ▲ │ ┌──┴──────────────────────────┐ │ MirrorMaker1单进程 │ │ Consumer 拉 Producer 推 │ │ │ │ 问题 │ │ 1. 单点挂了整个同步停掉 │ │ 2. 无Topic自动发现 │ │ 3. Offset不同步消费进度丢失│ │ 4. 无Rebalance机制 │ │ 5. 监控能力弱 │ └─────────────────────────────┘MM2的革命性改进MirrorMaker2完全基于Kafka Connect框架重写天然获得Connect的所有能力能力MM1MM2框架基础自定义ConsumerProducerKafka Connect分布式部署❌ 单进程✅ 多Worker集群高可用❌ 挂了就停✅ 自动RebalanceTopic自动发现❌ 手动配置列表✅ 正则匹配自动同步Offset同步❌ 不支持✅ Offset翻译同步双向同步❌ 需对等部署✅ 原生支持Connector管理❌ 无✅ Connect REST API监控指标弱✅ JMX指标动态配置❌ 需重启✅ REST API热更新【MM2 架构】 Kafka Connect Cluster ┌──────────────────────────────┐ │ Worker 1 Worker 2 │ │ ┌────────┐ ┌────────┐ │ │ │Mirror │ │Mirror │ │ │ │Source │ │Source │ │ │ │Conn. │ │Conn. │ │ │ └───┬────┘ └───┬────┘ │ │ │replicate │ │ └──────┼───────────┼────────────┘ │ │ ┌────────────┴───┐ ┌───┴──────────────┐ │ 源集群(上海) │ │ 目标集群(深圳) │ │ │ │ │ │ Topic A ────────┼──► sh.topic_a │ │ Topic B ────────┼──► sh.topic_b │ │ │ │ │ │ sh.offset_sync │ │ offset-syncs │ │ ◄───────────┼─── (offset翻译Topic) │ └─────────────────┘ └───────────────────┘二、双向镜像配置详解双向镜像Active-Active是两个集群互相同步数据任一集群既是源也是目标。完整配置文件# # connect-mirror-maker.properties # MirrorMaker2 的完整配置 # # ---- Connect Worker 基础配置 ---- bootstrap.serverssh-kafka1:9092,sh-kafka2:9092,sh-kafka3:9092 group.idmm2-cluster key.converterorg.apache.kafka.connect.converters.ByteArrayConverter value.converterorg.apache.kafka.connect.converters.ByteArrayConverter # Offset存储TopicConnect内部用 offset.storage.topicmm2-offsets offset.storage.replication.factor3 # 配置存储Topic config.storage.topicmm2-configs config.storage.replication.factor3 # 状态存储Topic status.storage.topicmm2-status status.storage.replication.factor3 # ---- 集群别名定义 ---- # 为每个集群定义一个别名后面配置用 clusterssh, sz # 上海集群连接信息 sh.bootstrap.serverssh-kafka1:9092,sh-kafka2:9092,sh-kafka3:9092 # 深圳集群连接信息 sz.bootstrap.serverssz-kafka1:9092,sz-kafka2:9092,sz-kafka3:9092 # ---- 双向镜像配置 ---- # 上海 → 深圳 sh-sz.enabledtrue # 深圳 → 上海 sz-sh.enabledtrue # ---- Topic过滤正则 ---- # 同步所有匹配正则的Topic排除内部Topic sh-sz.topicsorders.*|payments.*|users.* # 排除内部Topic、测试Topic topics.exclude.*[\\-\\_]internal.*, .*test.*, __consumer_offsets # ---- Sync Group Offsets同步消费者组偏移量 ---- # 关键特性同步消费者组的offset支持灾备切换 sh-sz.groupsorder-service|payment-service|.*-consumer sz-sh.groupsorder-service|payment-service|.*-consumer # 启用消费者组同步 sh-sz.sync.group.offsets.enabledtrue sz-sh.sync.group.offsets.enabledtrue # Offset同步间隔 sh-sz.sync.group.offsets.interval.seconds60 # ---- 刷新间隔 ---- # 每30秒检查一次新Topic sh-sz.refresh.topics.interval.seconds30 sz-sh.refresh.topics.interval.seconds30 # 每60秒检查一次新消费者组 sh-sz.refresh.groups.interval.seconds60 # ---- 吞吐量控制 ---- # 每秒最多同步10000条消息防止打爆目标集群 sh-sz.tasks.max4 sz-sh.tasks.max4 # ---- 内部Topic配置 ---- # 控制内部Topic的副本因子 checkpoints.topic.replication.factor3 heartbeats.topic.replication.factor2 offset-syncs.topic.replication.factor3 # Offset翻译的副本因子 sh-sz.offset-syncs.topic.replication.factor3启动MM2# 使用connect-mirror-maker脚本启动bin/connect-mirror-maker.sh config/connect-mirror-maker.properties# 或者作为Connect集群的一部分启动从Kafka 2.7开始# 方式1独立启动bin/connect-mirror-maker.sh connect-mirror-maker.properties# 方式2通过Connect REST API创建curl-XPOST http://localhost:8083/connectors\-HContent-Type: application/json\-d{ name: mm2-sh-to-sz, config: { connector.class: org.apache.kafka.connect.mirror.MirrorSourceConnector, source.cluster.alias: sh, target.cluster.alias: sz, topics: orders.*, tasks.max: 4 } }三、Topic自动重命名——防止循环同步的大杀器双向镜像中最大的问题是循环同步上海把一条消息同步到深圳深圳又把它同步回上海……MM2通过Topic前缀机制自动解决【Topic前缀机制】 源集群(上海) 目标集群(深圳) Topic: orders.created Topic: sh.orders.created ↑ ↑ 原始名称 自动添加源集群前缀 好处 1. 上海的orders.created → 深圳的sh.orders.created 2. 深圳不会把sh.orders.created同步回上海因为有前缀 3. 避免了死循环 命名规则 {source_cluster_alias}.{original_topic_name}Offset翻译——灾备切换的关键MM2不仅同步Topic数据还同步消费者组Offset。当你需要在灾备时切换消费者【Offset翻译机制】 上海集群 Consumer Group order-service Topic orders.created, Partition 0 → Offset: 15320 深圳集群备份 通过offset-syncs Topic记录映射关系 上海::orders.created::order-service → 深圳::sh.orders.created::order-service 灾备切换后 Flink Job从深圳的sh.orders.created消费 自动从对应Offset开始不会丢失进度 ── 这就是MM2比MM1强100倍的地方四、生产部署注意事项4.1 网络带宽评估【跨集群同步的带宽估算】 假设 - 消息平均大小1KB - 消息速率100,000条/秒 - 集群之间网络带宽1Gbps 所需带宽 1KB × 100,000 × 8bits 800Mbps 带宽利用率 800Mbps / 1000Mbps 80% 风险评估 - 70%利用率 → 有风险高峰期可能打满 - 解决方案增加网络带宽或降低同步速率4.2 MM2部署拓扑【推荐部署拓扑】 方案A非对称部署推荐 主集群(上海) ← 部署MM2 Worker → 备集群(深圳) 优点MM2只部署在一边管理简单 缺点MM2挂了需要恢复 方案B对称部署 主集群(上海) ← MM2-A → 备集群(深圳) ↑ ↑ └───── MM2-B ─────────┘ 优点高可用任一边挂了另一边继续 缺点配置复杂需要协调 生产推荐方案A MM2多Worker至少2个实现HA4.3 延迟与一致性【异地同步延迟的现实】 上海 → 深圳 网络延迟~30ms 同步延迟网络RTT Kafka处理时间 ≈ 50-100ms 这意味着 - 不要期望异地集群的数据是实时一致的 - 灾难切换时可能有秒级到分钟级的数据丢失 - 金融场景不能仅依赖异地Kafka同步保证一致性4.4 安全配置# MM2连接两个集群都需要的安全配置 # 上海集群安全 sh.security.protocolSASL_SSL sh.sasl.mechanismSCRAM-SHA-256 sh.sasl.jaas.configorg.apache.kafka.common.security.scram.ScramLoginModule \ required usernamemm2 passwordmm2-secret; # 深圳集群安全 sz.security.protocolSASL_SSL sz.sasl.mechanismSCRAM-SHA-256 sz.sasl.jaas.configorg.apache.kafka.common.security.scram.ScramLoginModule \ required usernamemm2 passwordmm2-secret-sz;4.5 监控关键指标# 检查MM2 Connector状态curlhttp://localhost:8083/connectors/mm2-sh-to-sz/status# 关键监控指标指标含义告警阈值MirrorSourceConnector.record-count已同步记录数连续5分钟不增 → 告警MirrorSourceConnector.replication-latency-ms同步延迟 5秒 → 告警MirrorCheckpointConnector.checkpoint-latency-msOffset翻译延迟 10秒 → 告警Connector StatusConnector状态FAILED → 紧急五、MM2 vs uReplicator vs Confluent Replicator如果你不是只用开源方案还有两个选择维度MirrorMaker2uReplicator (Uber)Confluent Replicator开源✅ Apache 2.0✅ Apache 2.0❌ 需Confluent授权框架Kafka ConnectHelix KafkaKafka ConnectTopic发现✅ 正则匹配✅ 正则匹配✅ 正则匹配Offset同步✅✅✅Schema同步❌ 需额外配置❌✅ Schema Registry同步Web UI❌ 仅REST API✅ 有Dashboard✅ Confluent Control Center社区支持✅ Kafka官方中等Uber维护✅ 商业支持适用场景通用大规模生产Uber内部验证企业级需要额外功能本篇小结MirrorMaker2是Kafka生态中最成熟的跨集群数据同步工具架构跃升从MM1的单进程ConsumerProducer → 基于Kafka Connect的分布式Worker集群获得HA、自动发现、动态配置能力双向镜像通过Topic前缀机制自动避免循环同步源集群Topicorders→ 目标集群sh.ordersOffset翻译是灾备切换的核心消费者组Offset自动同步切换集群时不会丢失消费进度生产部署三要素0)网络带宽至少预留30%余量1)MM2多Worker部署保证HA2)异地延迟50-100ms是物理瓶颈不可消灭如果你在用开源Kafka做跨集群同步MM2就是最佳选择没有之一。上一篇【第76篇】Kafka与Flink集成实战——大数据实时计算的黄金组合下一篇【第78篇】Kafka生态全景图——与大数据技术栈的完美融合