一、从批处理到流处理1.1 批处理 vs 流处理的本质区别在上一篇中我们使用readTextFile(input/words.txt)读取文件数据这本质上是一种有界流Bounded Stream—— 数据量有限读取完即结束。而真正的流处理面对的是无界流Unbounded Stream—— 数据持续产生没有尽头。1.2 流处理的核心特征事件驱动Event-Driven└── 数据到来才触发计算没有数据时程序阻塞等待持续运行Continuous└── 程序启动后不会自动结束除非手动取消低延迟Low Latency└── 来一条处理一条毫秒级响应有状态Stateful└── 需要维护中间计算状态如WordCount的累计值二、Socket文本流最简单的流数据源2.1 为什么用Socket在实际生产环境中数据流是无界的有开始但没有结束。为了模拟这种场景我们可以监听一个Socket端口然后向该端口不断发送数据。2.2 环境准备Linux环境发送端# 使用ncnetcat命令监听7777端口# -l: 监听模式# -k: 保持连接允许多个客户端连接$nc-lk7777注意需要先启动Socket端口再启动Flink程序否则会报超时连接异常。三、流处理WordCount完整实现3.1 代码实现创建SocketStreamWordCount.javapackagecom.atguigu.wc;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/** * Socket文本流实时WordCount * 演示真正的流处理无界数据流、事件驱动、持续运行 */publicclassSocketStreamWordCount{publicstaticvoidmain(String[]args)throwsException{// 1. 创建流式执行环境StreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取Socket文本流// 参数1发送端主机名或IP地址// 参数2端口号DataStreamSourceStringlineStreamenv.socketTextStream(hadoop102,7777);// 3. 转换、分组、求和得到统计结果SingleOutputStreamOperatorTuple2String,LongsumlineStream// 3.1 Lambda表达式实现flatMap.flatMap((Stringline,CollectorTuple2String,Longout)-{String[]wordsline.split( );for(Stringword:words){out.collect(Tuple2.of(word,1L));}})// 3.2 必须显式声明返回类型Java泛型擦除.returns(Types.TUPLE(Types.STRING,Types.LONG))// 3.3 按单词分组.keyBy(data-data.f0)// 3.4 按count字段索引1求和.sum(1);// 4. 打印结果sum.print();// 5. 执行流处理程序会持续运行等待数据到来env.execute();}}3.2 与批处理代码的对比对比项批处理文件流处理Socket数据源readTextFile(input/words.txt)socketTextStream(hadoop102, 7777)数据特性有界流读完即结束无界流持续监听程序行为执行完自动退出持续运行不会自动退出触发方式数据已存在立即处理事件驱动来一条处理一条输出特点一次性输出最终结果每来一条数据增量更新输出四、运行演示4.1 启动顺序重要步骤1先启动Socket端口发送端 步骤2再启动Flink程序接收端 步骤3在Socket端输入数据观察Flink输出4.2 具体操作终端1 - 启动Socket发送端$nc-lk7777终端2 - 启动Flink程序# 在IDEA中直接运行SocketStreamWordCount.main()# 或在项目目录执行$ mvn exec:java-Dexec.mainClasscom.atguigu.wc.SocketStreamWordCount程序启动后控制台没有任何输出也不会退出。这是正常的因为流处理是事件驱动的当前程序处于监听状态。4.3 输入数据与输出结果在终端1Socket端输入数据# 输入第1条数据 hello flink终端2Flink程序输出13 (flink,1) 5 (hello,1)在终端1继续输入# 输入第2条数据 hello world终端2增量输出2 (world,1) 5 (hello,2)4.4 输出结果分析┌─────────────────────────────────────────────────────────────┐ │ 输出结果解读 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 输入hello flink │ │ 输出 │ │ 13 (flink,1) ← 第13号并行子任务输出flink出现1次 │ │ 5 (hello,1) ← 第5号并行子任务输出hello出现1次 │ │ │ │ 输入hello world │ │ 输出 │ │ 2 (world,1) ← 第2号并行子任务输出world出现1次 │ │ 5 (hello,2) ← 第5号并行子任务输出hello累计出现2次 │ │ │ │ 关键发现 │ │ • hello被分配到第5号子任务累计计数从1增加到2 │ │ • flink被分配到第13号子任务 │ │ • world被分配到第2号子任务 │ │ • 相同key的数据会被分配到同一个并行子任务 │ │ │ └─────────────────────────────────────────────────────────────┘五、深入理解Lambda表达式的类型推断5.1 问题现象在flatMap中使用Lambda表达式时如果去掉.returns()方法// 错误写法缺少returns()SingleOutputStreamOperatorTuple2String,LongsumlineStream.flatMap((Stringline,CollectorTuple2String,Longout)-{// ... 省略处理逻辑})// .returns(Types.TUPLE(Types.STRING, Types.LONG)) // 注释掉这行.keyBy(data-data.f0).sum(1);5.2 报错信息Exception in thread main org.apache.flink.api.common.functions.InvalidTypesException: The return type of function main(SocketStreamWordCount.java:28) could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the flatMap operation.5.3 原因深度解析┌─────────────────────────────────────────────────────────────┐ │ Java泛型擦除与Flink类型提取系统 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. Java泛型擦除机制 │ │ │ │ 编译前 │ │ ListString list new ArrayList(); │ │ │ │ 编译后字节码 │ │ List list new ArrayList(); // 类型信息被擦除为原始类型 │ │ │ │ 2. Flink的类型提取系统 │ │ │ │ • 可以分析函数的输入和返回类型 │ │ • 自动获取TypeInformation生成序列化器和反序列化器 │ │ • 这是Flink高效处理数据的基础 │ │ │ │ 3. Lambda表达式的特殊情况 │ │ │ │ 普通匿名类 │ │ new FlatMapFunctionString, Tuple2String, Long() { │ │ // 编译器保留泛型信息在类签名中 │ │ } │ │ → Flink可以通过反射获取完整类型信息 │ │ │ │ Lambda表达式 │ │ (String line, CollectorTuple2String, Long out) - { │ │ // 编译后类型信息完全擦除 │ │ } │ │ → Flink只能推断出返回的是 Tuple2 类型 │ │ → 无法得到 Tuple2String, Long 的完整信息 │ │ │ │ 类比 │ │ 只知道船头、船身、船尾的构成 │ │ 但无法重建大船的完整模样 │ │ │ │ 4. 解决方案显式提供类型信息 │ │ │ │ .returns(Types.TUPLE(Types.STRING, Types.LONG)) │ │ → 明确告诉Flink返回的是 Tuple2String, Long │ │ │ └─────────────────────────────────────────────────────────────┘5.4 三种解决方式对比方式代码示例适用场景returns().returns(Types.TUPLE(Types.STRING, Types.LONG))Lambda表达式最常用匿名类new FlatMapFunctionString, Tuple2String, Long()无需returns代码稍长TypeHintreturns(new TypeHintTuple2String, Long(){})复杂泛型场景六、流处理的事件驱动特性6.1 程序执行状态分析┌─────────────────────────────────────────────────────────────┐ │ SocketStreamWordCount执行状态 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 启动阶段 │ │ ┌─────────────────────────────────────┐ │ │ │ 1. 创建执行环境 │ │ │ │ 2. 构建数据流图StreamGraph │ │ │ │ 3. 调用socketTextStream建立连接 │ │ │ │ 4. 调用execute()提交作业 │ │ │ │ 5. 等待数据到来... │ ← 程序阻塞在此 │ │ └─────────────────────────────────────┘ │ │ │ │ 运行阶段事件驱动 │ │ ┌─────────────────────────────────────┐ │ │ │ Socket端口收到数据 │ │ │ │ ↓ │ │ │ │ 触发flatMap转换 │ │ │ │ ↓ │ │ │ │ 触发keyBy分组 │ │ │ │ ↓ │ │ │ │ 触发sum聚合更新状态 │ │ │ │ ↓ │ │ │ │ 输出结果到控制台 │ │ │ │ ↓ │ │ │ │ 继续等待下一条数据... │ ← 循环等待 │ │ └─────────────────────────────────────┘ │ │ │ │ 终止条件 │ │ • 手动取消作业IDEA停止/CtrlC │ │ • Socket连接断开 │ │ • 程序异常 │ │ │ └─────────────────────────────────────────────────────────────┘6.2 与批处理的执行差异┌─────────────────────────────────────────────────────────────┐ │ 批处理 vs 流处理执行流程对比 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 批处理文件WordCount │ │ │ │ 开始 → 读取全部数据 → 处理全部数据 → 输出结果 → 结束 │ │ │ │ │ │ └──────────────── 一次性完成 ───────────────────────┘ │ │ │ │ 流处理Socket WordCount │ │ │ │ 开始 → 等待数据 → 处理数据 → 输出结果 → 等待数据 → 处理... │ │ │ ↑________________________________________│ │ │ └────────── 循环执行永不结束除非手动停止 ───────────┘ │ │ │ │ 关键差异 │ │ • 批处理数据先存在后处理Pull模式 │ │ • 流处理数据到来即处理Push模式 │ │ │ └─────────────────────────────────────────────────────────────┘七、生产环境数据源演进7.1 从Socket到KafkaSocket文本流仅用于测试生产环境推荐使用专业消息队列┌─────────────────────────────────────────────────────────────┐ │ 生产环境数据源演进 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 开发测试阶段 生产环境阶段 │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ Socket文本流 │ ────────▶ │ Apache Kafka │ │ │ │ • 简单快速 │ │ • 高吞吐量 │ │ │ │ • 不稳定 │ │ • 高可用 │ │ │ │ • 易丢失数据 │ │ • 可持久化 │ │ │ │ • 无重放能力 │ │ • 可重放数据 │ │ │ └──────────────┘ └──────────────┘ │ │ │ │ Flink Kafka Connector │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ KafkaSource.Stringbuilder() │ │ │ │ .setBootstrapServers(hadoop102:9092) │ │ │ │ .setTopics(topic_1) │ │ │ │ .setGroupId(atguigu) │ │ │ │ .setStartingOffsets(OffsetsInitializer.latest())│ │ │ │ .build(); │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘7.2 后续数据源专题预告数据源适用场景特点集合单元测试env.fromCollection(Arrays.asList(1, 2, 3))文件批处理、历史数据env.readTextFile(path)Socket开发测试env.socketTextStream(host, port)Kafka生产环境实时数据KafkaSource.builder()DataGen性能测试、无数据源时DataGeneratorSource八、常见问题总结Q1程序启动后没有输出是不是卡住了不是卡住是正常等待数据流处理程序启动后会持续监听Socket端口只有接收到数据才会触发计算并输出。这是Flink事件驱动的正常表现。Q2为什么必须先启动Socket再启动Flink程序如果先启动Flink程序它会尝试连接Socket端口。如果端口未启动会报连接超时异常java.net.ConnectException: Connection refusedQ3如何优雅地停止流处理程序方式1IDEA中点击停止按钮发送SIGTERM信号 方式2终端按CtrlC 方式3调用env.close()或取消作业Q4输出结果前面的数字是什么意思数字表示并行子任务编号如5说明该结果由第5个并行子任务输出。可以通过设置全局并行度来减少env.setParallelism(1);// 全局并行度设为1输出更简洁Q5Socket方式数据会丢失吗会Socket是简单的TCP连接没有持久化机制Flink程序未启动时发送的数据会丢失网络中断时数据会丢失生产环境必须使用Kafka等持久化消息队列九、总结9.1 本文核心要点要点内容流处理本质处理无界数据流事件驱动持续运行Socket数据源socketTextStream(host, port)仅用于测试启动顺序必须先启动Socket端口再启动Flink程序事件驱动数据到来才触发计算无数据时阻塞等待Lambda类型必须加returns()解决泛型擦除问题增量输出每来一条数据更新并输出累计结果9.2 流处理 vs 批处理对比总结特性批处理流处理数据边界有界无界执行方式一次性持续运行触发机制数据已存在主动处理事件驱动来一条处理一条延迟分钟/小时级毫秒/秒级状态管理无需或简单必须持续更新适用场景离线计算、报表实时监控、实时推荐9.3 流批统一的价值再认识┌─────────────────────────────────────────────────────────────┐ │ Flink流批统一的真正含义 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 不是流和批都能做而是同一套代码处理两种数据 │ │ │ │ 代码层面 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ DataStreamString stream env.readTextFile(...) │ │ │ │ DataStreamString stream env.socketTextStream(...)│ │ │ │ // 后续处理逻辑完全相同 │ │ │ │ .flatMap(...).keyBy(...).sum(...).print(); │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 执行层面 │ │ • 有界数据源文件→ 批模式或流模式处理完即结束 │ │ • 无界数据源Socket/Kafka→ 流模式持续运行 │ │ │ │ 价值开发一次到处运行维护一套覆盖所有场景 │ │ │ └─────────────────────────────────────────────────────────────┘如果本文对你有帮助欢迎点赞、收藏、关注有任何问题欢迎在评论区留言讨论。专栏持续更新中关注不迷路