EMR Serverless Spark 推出 Spark 4.0加速湖仓架构下的数据处理升级Apache Spark 4.0 正式发布了。这是 Spark 项目自诞生以来变化最大的一次版本升级——全新的 VARIANT 数据类型、原生 SQL UDF、重新设计的基础设施架构、以及对 Python 生态的全面增强。阿里云 EMR Serverless Spark当前已适配 Spark 4.0 企业用户可直接在生产环境使用这些能力无需自建集群、无需手动升级、无需担心兼容性。本文将介绍 Spark 4.0 带来的核心能力变革以及 Serverless Spark 在此基础上为企业级场景做的额外增强。一、Spark 4.0 带来了哪些核心新能力1. VARIANT 类型JSON 半结构化数据处理性能提升数倍这是 Spark 4.0 最值得关注的新特性专门解决企业级数据平台中长期存在的半结构化数据处理痛点。传统方案的局限性半结构化数据如 JSON 在 Spark 中常以 String 类型存储。这种方案具备开放和灵活的优点但在查询性能方面存在明显问题主要表现在两个方面无法应用结构化数据优化JSON String 的数据类型无法应用到列式裁剪和谓词下推等结构化数据处理中最常用的优化手段解析效率低下在获取 JSON 中具体 key 对应数据时需要解析整个 JSON 对象如 get_json_object--传统方法STRING 类型存储查询时实时解析CREATETABLEuser_events(event_idBIGINT,raw_payload STRING-- JSON 原文以字符串存储);-- 查询时需要反复解析SELECTget_json_object(raw_payload,$.user_id)ASuser_id,get_json_object(raw_payload,$.event_type)ASevent_type,get_json_object(raw_payload,$.timestamp)ASevent_time,get_json_object(raw_payload,$.properties.source)ASsourceFROMuser_eventsWHEREget_json_object(raw_payload,$.event_type)page_viewANDdt2025-03-15;该方案的核心问题性能损耗每次调用get_json_object都需完整解析 JSON取 N 个字段即解析 N 次优化器无法介入WHERE 条件中的 JSON 路径无法下推需全量数据扫描后过滤Schema 僵化若使用from_json预定义结构体上游字段变更将导致任务失败Spark 4.0 的解决方案Spark4 推出了 Variant 的数据类型能够满足半结构化数据需要的开放Open灵活Flexibel和高性能Fast。-- Spark 4.0VARIANT 类型存储CREATETABLEuser_events(event_idBIGINT,payload VARIANT-- 二进制编码自动索引);-- 数据写入时自动转换INSERTINTOuser_eventsSELECT1,parse_json({ user_id: U12345, event_type: purchase, timestamp: 2025-03-15T10:30:00, properties: {source: mobile_app, amount: 299.99} });-- 查询路径表达式直接访问简洁高效SELECTpayload:user_id::STRINGASuser_id,payload:event_type::STRINGASevent_type,payload:properties.amount::DECIMAL(10,2)ASamountFROMuser_eventsWHEREpayload:event_type::STRINGpurchase;-- 单次解析谓词可下推无需全量扫描技术对比维度STRING get_json_objectVARIANT存储格式JSON 原文字符串二进制编码 自动索引查询性能O(N) 重复解析O(1) 路径定位优化器支持黑盒无法下推路径表达式参与谓词下推Schema 灵活性需预定义或完全无结构动态适应结构变化语法简洁度冗长的函数调用直观的路径语法企业级应用场景用户行为埋点分析埋点数据结构频繁变化VARIANT 无需预定义 Schema天然适配敏捷迭代多源异构数据入湖不同业务系统的 JSON 结构差异大无需强行统一 Schema 即可入湖API 日志存储与分析RESTful API 的请求/响应体直接存储按需提取字段比预解析方案灵活数倍2. SQL UDF告别 Python UDF 性能瓶颈SQL 原生函数优化传统 UDF 的瓶颈Spark 3.x 中封装可复用逻辑需使用 Python/Java UDF但存在本质缺陷# Spark 3.xPython UDFfrompyspark.sql.functionsimportudffrompyspark.sql.typesimportDoubleTypeudf(returnTypeDoubleType())defcalculate_discount(price,member_level):rates{1:0.95,2:0.90,3:0.85}returnprice*rates.get(member_level,1.0)# 优化器无法分析 UDF 内部逻辑# - 无法常量折叠# - 无法谓词下推# - 存在 JVM ↔ Python 跨进程开销df.select(calculate_discount(col(price),col(level)))Spark 4.0 SQL UDF纯 SQL 定义函数体优化器可直接内联展开并参与全局优化-- 定义 SQL UDFCREATEFUNCTIONcalculate_discount(priceDECIMAL(10,2),levelINT)RETURNSDECIMAL(10,2)RETURNCASElevelWHEN1THENprice*0.95WHEN2THENprice*0.90WHEN3THENprice*0.85ELSEpriceEND;-- 支持函数组合CREATEFUNCTIONfinal_price(priceDECIMAL(10,2),levelINT,tax_rateDECIMAL(4,2))RETURNSDECIMAL(10,2)RETURNcalculate_discount(price,level)*(1tax_rate);-- 优化器将完全展开并应用所有优化规则SELECT*FROMordersWHEREcalculate_discount(price,member_level)1000;表函数支持返回多行-- 生成日期序列CREATEFUNCTIONdate_range(start_dateDATE,end_dateDATE)RETURNSTABLE(dtDATE,day_of_week STRING)RETURNSELECTday,date_format(day,EEEE)FROM(SELECTsequence(start_date,end_date))AST(days)LATERALVIEWexplode(days)ASday;-- 直接使用SELECT*FROMdate_range(DATE2025-01-01,DATE2025-01-31);执行路径对比Python/Java UDFSQL → 优化器(跳过) → 序列化 → 跨进程调用 → 反序列化 SQL UDF SQL → 优化器(内联展开完整优化) → 直接执行3. PySpark 增强原生可视化、自定义数据源Python 数据工程全面升级Spark 4.0 针对 Python 生态进行了关键改进原生可视化 APIDataFrame 直接支持.plot()方法服务端完成聚合后仅传输绘图所需结果# Spark 3.x需先转 Pandas大数据量易 OOMdf_summarydf.groupBy(region).agg(sum(revenue).alias(total)).toPandas()df_summary.plot.bar(xregion,ytotal)# Spark 4.0服务端聚合后直接渲染df.groupBy(region).agg(sum(revenue).alias(total))\.plot.bar(xregion,ytotal)Python Data Source APISpark 4.0 新增 Python Data Source API无需 Java/Scala纯 Python 即可实现自定义数据源连接器支持批量读写。以对接阿里云 OSS 上的 JSON 文件为例只需实现三个类注册后像使用 Parquet、CSV 一样使用自定义数据源spark.dataSource.register(OssJsonDataSource)# 写入df.write.format(oss_json).option(path,data/events).mode(overwrite).save()# 读取spark.read.format(oss_json).option(path,data/events).load().show()4. 性能提升 30%、管道语法与基础设施全面升级整体性能提升Spark 4.0 通过查询优化器改进、执行引擎优化和内存管理增强在 TPC-DS 等基准测试中相比 Spark 3.x 性能提升约 30%。SQL 管道语法|复杂查询的书写顺序与数据处理顺序一致-- 传统写法从内到外阅读SELECTregion,totalFROM(SELECTregion,SUM(amount)AStotalFROMordersGROUPBYregion)WHEREtotal100000ORDERBYtotalDESC;-- 管道语法从上到下阅读FROMorders|AGGREGATESUM(amount)AStotalGROUPBYregion|WHEREtotal100000|ORDERBYtotalDESC;Structured Streaming 状态管理 v2Arbitrary State API v2单算子内管理多状态变量State DataSource直接读取和调试流状态数据大幅降低有状态流处理的开发和运维复杂度基础设施升级组件Spark 3.xSpark 4.0Scala2.122.13JDK8 / 1117支持 21Python3.83.9二、阿里云 EMR Serverless Spark 在 Spark 4.0 上做了哪些增强Paimon Variant 深度适配Serverless Spark 完成了 Spark 4.0 VARIANT 类型与 Apache Paimon 的深度集成。Paimon Variant 基于 Shredding 技术列式存储优化高频访问字段自动提取为独立列查询时直接列读取谓词下推过滤条件下推至存储层显著减少 IO类型安全路径查询具备编译期类型推断对于数据湖上的 JSON 密集型工作负载Paimon Variant 配合 Spark 4.0 的 VARIANT 类型提供了业界领先的存储 计算方案。Fusion 向量化引擎性能较开源 Spark 提升 3 倍Fusion 引擎是 EMR Serverless Spark 内置的高性能向量化 SQL 执行引擎。在 TPC-DS 基准测试中Fusion 相比开源 Spark 性能提升可达 3 倍。我们已完成 Fusion 引擎对 Spark 4.0 的全面适配零代码改动自动启用支持 Spark SQL 和 DataFrame 任务Python UDF 支持支持通过 Python UDF 扩展 Spark SQL 功能让企业能够使用 Python 生态灵活实现自定义业务逻辑从 Spark 3 升级到 Spark 4零改动平滑迁移方案针对企业存量作业迁移痛点阿里云 Serverless Spark 提供了平滑升级方案JDK 兼容性适配Spark 4.0 弃用了 JDK 8默认仅支持 JDK 17。Serverless Spark 通过默认添加 JDK 参数支持原生 JDK 8 编译的作业直接运行在 Spark 4.0 引擎上无需重新打包自动识别作业编译时的 JDK 版本运行时动态适配无需用户干预存量 JDK 8 作业零改动迁移核心参数对齐Spark 4.0 默认开启 ANSI SQL 模式spark.sql.ansi.enabledtrue可能导致存量 SQL 出现兼容性问题。Serverless Spark 默认关闭 ANSI 模式保持与 Spark 3.x 一致的行为存量作业无需修改即可正常运行用户可根据需要手动开启 ANSI 模式享受严格类型检查