【FlinkSQL笔记】(二)Flink SQL 基础语法详解
一、数据类型常用Flink SQL 数据类型和MySQL高度兼容常用类型如下STRING字符串对应MySQL varcharINT整型BIGINT长整型计数、时间戳常用DOUBLE浮点型TIMESTAMP(3)高精度时间戳实时任务必备保留3位毫秒BOOLEAN布尔值二、核心建表语法重点Kafka源表为主Kafka是Flink实时任务最核心数据源以下是生产通用标准模板可直接复用-- 实时数据源读取Kafka JSON数据CREATETABLEkafka_source(vin STRING,-- 车辆唯一标识online_statusINT,-- 在线状态high_voltage STRING,-- 高压状态event_timeTIMESTAMP(3),-- 数据产生时间核心用于开窗、水印-- 水印定义容忍5秒数据乱序生产通用配置WATERMARKFORevent_timeASevent_time-INTERVAL5SECOND)WITH(connectorkafka,topicvehicle_realtime_data,-- 你的Kafka主题名properties.bootstrap.servers127.0.0.1:9092,-- Kafka地址properties.group.idflink_sql_consumer_01,-- 消费者组formatjson,-- 数据格式JSONscan.startup.modelatest-- 启动消费位置最新数据);参数说明connector指定数据源类型固定kafkatopic需要消费的Kafka主题名称bootstrap.serversKafka集群地址端口group.id消费者组自定义不重复即可用于记录消费偏移量format数据序列化格式企业99%为jsonscan.startup.mode启动规则latest从当前最新数据开始消费生产默认earliest从头消费所有历史数据测试用三、 结果表建表语法数据输出用于将实时计算结果写入Kafka、MySQL等存储模板如下-- 结果输出表写入KafkaCREATETABLEkafka_sink(vin STRING,online_durationBIGINT,alert_timeTIMESTAMP(3),alert_msg STRING)WITH(connectorkafka,topicvehicle_alert_result,properties.bootstrap.servers127.0.0.1:9092,formatjson);四、Flink SQL 常用查询语法和标准SQL基本一致1、 数据过滤 WHERESELECT*FROMkafka_sourceWHEREonline_status1-- 只筛选在线车辆ANDhigh_voltage!01;-- 筛选未上高压车辆2、字段选取、别名SELECTvinAScar_no,online_status,event_timeAScreate_timeFROMkafka_source;3、分组聚合 GROUP BYSELECTvin,COUNT(*)ASdata_count,-- 单车辆上报次数MAX(online_status)ASmax_statusFROMkafka_sourceGROUPBYvin;4、去重 DISTINCTSELECTDISTINCTvinFROMkafka_source;