Flink快速上手:创建项目与批处理WordCount
一、环境准备1.1 开发环境要求组件版本要求说明JDK1.8Flink 1.17支持Java 8/11/17Maven3.6项目构建工具IDEA2020推荐IntelliJ IDEAFlink1.17.0本文使用版本1.2 检查Java环境$java-versionopenjdk version1.8.0_352OpenJDK Runtime Environment(build1.8.0_352-b08)OpenJDK64-Bit Server VM(build25.352-b08, mixed mode)二、创建Flink Maven项目2.1 新建Maven工程打开IntelliJ IDEA按以下步骤操作步骤1点击File → New → Project选择Maven项目类型步骤2填写项目信息Name:FlinkTutorialGroupId:com.atguiguArtifactId:FlinkTutorialVersion:1.0-SNAPSHOT步骤3选择项目存储路径点击Finish完成创建2.2 项目结构预览创建完成后项目结构如下FlinkTutorial/ ├── pom.xml # Maven配置文件 └── src/ ├── main/ │ ├── java/ # Java源码目录 │ └── resources/ # 资源文件目录 └── test/ └── java/ # 测试代码目录三、添加Flink依赖3.1 配置pom.xml在pom.xml文件中添加Flink核心依赖?xml version1.0 encodingUTF-8?projectxmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.atguigu/groupIdartifactIdFlinkTutorial/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding!-- Flink版本 --flink.version1.17.0/flink.version/propertiesdependencies!-- Flink流处理核心依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependency!-- Flink客户端依赖用于本地执行和提交作业 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependency/dependencies/project3.2 依赖说明依赖作用是否必需flink-streaming-javaDataStream API核心✅ 必需flink-clients本地执行和集群提交可选本地测试建议添加Maven刷新修改pom.xml后点击IDEA右侧Maven面板的Reload按钮或右键项目选择Maven → Reload Project四、数据准备4.1 创建输入目录和文件在项目根目录下创建input文件夹并新建words.txt文件FlinkTutorial/ ├── input/ │ └── words.txt # 输入数据文件 ├── pom.xml └── src/ └── main/ └── java/ └── com/ └── atguigu/ └── wc/ # 后续创建的包4.2 输入数据内容在words.txt中输入以下内容hello flink hello world hello java五、DataSet API实现批处理WordCount了解即可5.1 代码实现⚠️注意DataSet API从Flink 1.12开始已标记为废弃官方推荐使用DataStream API。本节仅为了解Flink历史演进。创建包com.atguigu.wc新建BatchWordCount.javapackagecom.atguigu.wc;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.operators.AggregateOperator;importorg.apache.flink.api.java.operators.DataSource;importorg.apache.flink.api.java.operators.FlatMapOperator;importorg.apache.flink.api.java.operators.UnsortedGrouping;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.util.Collector;/** * DataSet API实现批处理WordCount已废弃仅作了解 */publicclassBatchWordCount{publicstaticvoidmain(String[]args)throwsException{// 1. 创建批处理执行环境ExecutionEnvironmentenvExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据按行读取每行是一个字符串DataSourceStringlineDSenv.readTextFile(input/words.txt);// 3. 转换数据格式每行拆分为单词转换为 (word, 1) 二元组FlatMapOperatorString,Tuple2String,LongwordAndOnelineDS.flatMap(neworg.apache.flink.api.common.functions.FlatMapFunctionString,Tuple2String,Long(){OverridepublicvoidflatMap(Stringline,CollectorTuple2String,Longout)throwsException{// 按空格分割每行String[]wordsline.split( );for(Stringword:words){// 输出 (word, 1)out.collect(Tuple2.of(word,1L));}}});// 4. 按照word进行分组按二元组的第0个字段分组UnsortedGroupingTuple2String,LongwordAndOneUGwordAndOne.groupBy(0);// 5. 分组内聚合统计按二元组的第1个字段求和AggregateOperatorTuple2String,LongsumwordAndOneUG.sum(1);// 6. 打印结果sum.print();}}5.2 执行结果(flink,1) (world,1) (hello,3) (java,1)六、DataStream API实现批处理WordCount推荐方式6.1 流批统一设计理念从Flink 1.12开始官方推荐直接使用DataStream API通过设置执行模式为BATCH进行批处理。6.2 代码实现创建StreamWordCount.javapackagecom.atguigu.wc;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/** * DataStream API实现WordCount流批统一 * 读取文件数据有界流默认以流模式处理 */publicclassStreamWordCount{publicstaticvoidmain(String[]args)throwsException{// 1. 创建流式执行环境StreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件有界数据源DataStreamSourceStringlineStreamenv.readTextFile(input/words.txt);// 3. 转换、分组、求和得到统计结果SingleOutputStreamOperatorTuple2String,LongsumlineStream// 3.1 flatMap将每行拆分为单词转换为 (word, 1).flatMap(neworg.apache.flink.api.common.functions.FlatMapFunctionString,Tuple2String,Long(){OverridepublicvoidflatMap(Stringline,CollectorTuple2String,Longout)throwsException{String[]wordsline.split( );for(Stringword:words){out.collect(Tuple2.of(word,1L));}}})// 3.2 返回类型提示Java泛型擦除问题.returns(Types.TUPLE(Types.STRING,Types.LONG))// 3.3 keyBy按单词分组使用Lambda表达式.keyBy(data-data.f0)// 3.4 sum按第1个字段索引从0开始1表示count求和.sum(1);// 4. 打印结果sum.print();// 5. 执行DataStream API必须显式调用executeenv.execute();}}6.3 执行结果3 (java,1) 5 (hello,1) 5 (hello,2) 5 (hello,3) 13 (flink,1) 9 (world,1)结果说明前面的数字如3、5表示输出该结果的并行子任务编号体现了Flink的并行处理特性。6.4 与DataSet API的关键差异对比项DataSet API已废弃DataStream API推荐执行环境ExecutionEnvironmentStreamExecutionEnvironment分组操作groupBy(0)keyBy(data - data.f0)返回类型自动推断需显式指定returns()触发执行懒执行print()即触发必须调用execute()并行度默认全局可针对算子设置七、Lambda表达式简化版本使用Lambda表达式简化flatMappackagecom.atguigu.wc;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;importjava.util.Arrays;/** * Lambda表达式简化版WordCount */publicclassLambdaWordCount{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceStringlineStreamenv.readTextFile(input/words.txt);SingleOutputStreamOperatorTuple2String,LongsumlineStream// Lambda表达式实现flatMap.flatMap((Stringline,CollectorTuple2String,Longout)-{String[]wordsline.split( );for(Stringword:words){out.collect(Tuple2.of(word,1L));}})// Lambda表达式必须显式声明返回类型.returns(Types.TUPLE(Types.STRING,Types.LONG)).keyBy(data-data.f0).sum(1);sum.print();env.execute();}}八、批处理模式设置8.1 代码中设置importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.api.common.RuntimeExecutionMode;publicclassBatchModeWordCount{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();// 设置为批处理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 后续处理与流模式完全相同...}}8.2 命令行设置# 提交作业时指定批处理模式$ bin/flink run -Dexecution.runtime-modeBATCH FlinkTutorial-1.0-SNAPSHOT.jar8.3 三种执行模式对比模式适用场景特点STREAMING无界数据流Kafka、Socket等默认模式持续处理BATCH有界数据流文件、集合等数据读完即结束AUTOMATIC不确定数据源类型自动根据数据源是否有界选择九、项目打包与提交9.1 添加打包插件在pom.xml中添加Maven Shade插件打包可执行jarbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.2.4/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludelog4j:*/exclude/excludes/artifactSetfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerscombine.childrenappendtransformerimplementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer//transformers/configuration/execution/executions/plugin/plugins/build9.2 打包命令# 在IDEA中右侧Maven面板 → Lifecycle → package# 或在终端执行$ mvn clean package打包完成后在target目录下生成两个jarFlinkTutorial-1.0-SNAPSHOT.jar—— 不包含依赖推荐集群已具备依赖FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar—— 包含所有依赖9.3 提交到集群# 启动Flink本地集群$ bin/start-cluster.sh# 提交作业$ bin/flink run-ccom.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar# 查看Web UI默认8081端口# http://localhost:8081十、常见问题总结Q1为什么DataStream API需要调用execute()Flink采用**延迟执行Lazy Execution**策略main()方法中只是定义了作业的执行操作添加到数据流图中此时并没有真正处理数据数据可能还没来execute()触发真正的作业执行并等待作业完成Q2Lambda表达式不加returns()会怎样错误信息示例 Exception in thread main org.apache.flink.api.common.functions.InvalidTypesException: The return type of function main(LambdaWordCount.java:24) could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method.Q3为什么输出结果前面有数字数字表示并行子任务的编号如5说明该结果由第5个并行子任务输出。Flink默认并行度为CPU核心数可以通过以下方式设置// 全局设置并行度env.setParallelism(2);// 针对某个算子设置并行度sum.print().setParallelism(1);// 输出并行度设为1结果有序十一、总结11.1 本文核心要点要点内容项目搭建Maven项目 Flink 1.17依赖flink-streaming-java flink-clientsDataSet API已废弃了解即可不推荐新项目使用DataStream API官方推荐流批统一一套API处理两种场景Lambda简化必须配合returns()解决泛型擦除问题延迟执行必须调用env.execute()触发作业执行打包提交Maven Shade插件打包通过flink run提交11.2 流批一体的价值价值✅ 降低学习成本只需掌握一套API✅ 降低开发成本相同逻辑无需写两份代码✅ 降低维护成本统一升级、统一优化✅ 保证结果一致相同逻辑流和批结果相同如果本文对你有帮助欢迎点赞、收藏、关注有任何问题欢迎在评论区留言讨论。专栏持续更新中关注不迷路