物联网 基于netty构建mqtt协议规范(发布/订阅模式)简述源码(netty-sample-03-PubSub)核心角色与工作流程整体流程sub代码broker代码pub代码验证结果物联网 基于netty构建mqtt协议规范(发布/订阅模式)简述提供一对多的消息分发和消息生产/消费方的解耦发布者无需知道订阅者的存在两者通过 Broker 间接通信源码(netty-sample-03-PubSub)https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-03核心角色与工作流程角色职责订阅者Subscriber向 Broker 发送 SUBSCRIBE 请求注册对某个主题的兴趣接收来自 Broker 的推送消息。发布者Publisher向 Broker 发送 PUBLISH 请求将消息发送到指定主题。Broker代理维护主题与订阅者列表的映射接收发布的消息并转发给对应主题的所有订阅者。整体流程┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │ Publisher │ ──────► │ Broker │ ◄────── │ Subscriber │ │ (8888) │ 发布 │ (消息总线) │ 订阅 │ (9999) │ └─────────────┘ └──────────────┘ └──────────────┘ 端口: 7777sub代码package com.jysemel.iot.subscriber; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.nio.charset.StandardCharsets; import java.util.Scanner; public class SubscriberServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup group new NioEventLoopGroup(); try { Bootstrap bootstrap new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ChannelPipeline p ch.pipeline(); p.addLast(new LineBasedFrameDecoder(1024)); p.addLast(new StringDecoder(StandardCharsets.UTF_8)); p.addLast(new StringEncoder(StandardCharsets.UTF_8)); p.addLast(new SubscriberHandler()); } }); ChannelFuture future bootstrap.connect(127.0.0.1, 7777).sync(); System.out.println(); System.out.println( Subscriber 客户端已启动); System.out.println(); System.out.println(已连接到 Broker: 127.0.0.1:7777); System.out.println(功能: 订阅主题并接收消息); System.out.println(命令格式: SUB topic); System.out.println(示例: SUB news); System.out.println(输入 quit 退出); System.out.println(\n); Channel channel future.channel(); Scanner scanner new Scanner(System.in); while (true) { String input scanner.nextLine(); if (quit.equalsIgnoreCase(input)) { break; } channel.writeAndFlush(input \n); } } finally { group.shutdownGracefully(); } } }broker代码package com.jysemel.iot.broker; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.nio.charset.StandardCharsets; public class BrokerServer { public void start() throws InterruptedException { EventLoopGroup bossGroup new NioEventLoopGroup(1); EventLoopGroup workerGroup new NioEventLoopGroup(); try { ServerBootstrap bootstrap new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ChannelPipeline p ch.pipeline(); p.addLast(new LineBasedFrameDecoder(1024)); p.addLast(new StringDecoder(StandardCharsets.UTF_8)); p.addLast(new StringEncoder(StandardCharsets.UTF_8)); p.addLast(new BrokerHandler()); } }); ChannelFuture future bootstrap.bind(7777).sync(); System.out.println(); System.out.println( Broker 服务器启动); System.out.println(); System.out.println(端口: 7777); System.out.println(功能: 消息路由和分发充当 Redis 角色); System.out.println(支持命令:); System.out.println( - PUB topic message : 发布消息); System.out.println( - SUB topic : 订阅主题); System.out.println( - UNSUB topic : 取消订阅); System.out.println(\n); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new BrokerServer().start(); } }pub代码package com.jysemel.iot.publisher; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.nio.charset.StandardCharsets; import java.util.Scanner; public class PublisherServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup group new NioEventLoopGroup(); try { Bootstrap bootstrap new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ChannelPipeline p ch.pipeline(); p.addLast(new LineBasedFrameDecoder(1024)); p.addLast(new StringDecoder(StandardCharsets.UTF_8)); p.addLast(new StringEncoder(StandardCharsets.UTF_8)); p.addLast(new PublisherHandler()); } }); ChannelFuture future bootstrap.connect(127.0.0.1, 7777).sync(); System.out.println(); System.out.println( Publisher 客户端已启动); System.out.println(); System.out.println(已连接到 Broker: 127.0.0.1:7777); System.out.println(功能: 发布消息到指定主题); System.out.println(命令格式: PUB topic message); System.out.println(示例: PUB news Hello World); System.out.println(输入 quit 退出); System.out.println(\n); Channel channel future.channel(); Scanner scanner new Scanner(System.in); while (true) { String input scanner.nextLine(); if (quit.equalsIgnoreCase(input)) { break; } channel.writeAndFlush(input \n); } } finally { group.shutdownGracefully(); } } }验证结果