组织每月在 AWS 上运行数百万个 Apache Spark 应用程序,移动、处理和准备用于分析和机器学习的数据。随着这些应用程序的老化,保持它们的安全和高效变得越来越具有挑战性。数据从业者需要升级到最新的 Spark 版本,才能从性能改进、新功能、错误修复和安全增强中受益。然而,这些升级通常很复杂、成本高昂且耗时。
今天,我们很高兴地宣布推出 Spark 生成式 AI 升级预览版,这是一项新功能,使数据从业者能够快速升级和现代化在 AWS 上运行的 Spark 应用程序。从 Spark 职位开始AWS胶水,此功能允许您从较旧的 AWS Glue 版本升级到 AWS Glue 版本 4.0。这项新功能减少了数据工程师花在 Spark 应用程序现代化上的时间,使他们能够专注于构建新的数据管道并更快地获得有价值的分析。
了解 Spark 升级挑战
升级 Spark 应用程序的传统过程需要大量的手动工作和专业知识。数据从业者必须仔细查看增量 Spark 发行说明,以了解重大更改的复杂性和细微差别,其中一些可能没有记录。然后,他们需要修改 Spark 脚本和配置,根据需要更新功能、连接器和库依赖项。
测试这些升级涉及运行应用程序并解决出现的问题。每次测试运行都可能会发现新问题,从而导致多次迭代的更改。升级后的应用程序成功运行后,从业者必须根据生产中的预期结果验证新输出。这个过程通常会变成长达一年的项目,耗资数百万美元并消耗数万个工程时间。
Spark 的生成式 AI 升级如何工作
Spark 升级功能使用 AI 自动识别和验证 AWS Glue Spark 应用程序所需的更改。让我们探讨一下这些功能如何协同工作来简化您的升级过程。
AI驱动的升级计划生成
当您启动升级时,该服务会使用 AI 分析您的应用程序,以识别 PySpark 代码和 Spark 配置之间的必要更改。在预览期间,Spark Upgrades 支持从 Glue 2.0(Spark 2.4.3、Python 3.7)升级到 Glue 4.0(Spark 3.3.0、Python 3.10),自动处理通常需要对公众进行大量手动审核的更改。火花,Python和胶版迁移指南,然后是开发、测试和验证。Spark 升级解决了四个关键的变化领域:星火SQL API方法和功能Spark 数据帧 API
- 方法和操作Python
- 语言更新(包括模块弃用和语法更改)Spark SQL 和核心
- 配置设置当您考虑到从 Spark 2.4.3 迁移到 Spark 3.3.0 涉及一百多个特定于版本的更改时,这些升级的复杂性就变得显而易见。
- 有几个因素导致执行手动升级面临挑战:极具表现力的语言
通过混合命令式和声明式编程风格,用户可以轻松开发 Spark 应用程序。
- 然而,这增加了在升级期间识别受影响代码的复杂性。惰性执行
- 分布式 Spark 应用程序中的转换提高了性能,但给用户带来了应用程序升级的运行时验证的挑战。火花配置
- 默认值的更改或跨版本引入新配置可能会以不同的方式影响应用程序行为,从而使用户难以在升级过程中识别问题。例如,在 Spark 3.2 中,Spark SQL
转换运算符无法支持输入中的别名。
在 Spark 3.1 及更早版本中,您可以编写一个脚本转换,例如使用 TBL 中的“cat”选择变换(a AS c1,b AS c2)
。
# 原始代码(Glue 2.0)查询=“”“SELECT TRANSFORM(商品作为产品名称,价格作为产品价格,数字作为产品编号)使用“猫”来自货物WHERE 商品.价格 > 5”“”Spark.sql(查询)# 更新代码(Glue 4.0)查询=“”“选择变换(商品、价格、数量)USING 'cat' AS (产品名称、产品价格、产品编号)来自货物WHERE 商品.价格 > 5”“”Spark.sql(查询)
在 Spark 3.1 中,加载和保存 1900-01-01 00:00:00Z 之前的时间戳为INT96
Parquet 文件中的内容会导致错误。在 Spark 3.0 中,这不会失败,但可能会因日历变基而导致时间戳发生变化。要恢复 Spark 3.1 中的旧行为,您需要配置 Spark SQL 配置:Spark.sql.legacy.parquet.int96RebaseModeInRead
和Spark.sql.legacy.parquet.int96RebaseModeInWrite
到遗产
。
# 原始代码(Glue 2.0)数据 = [(1, "1899-12-31 23:59:59"), (2, "1900-01-01 00:00:00")]schema = StructType([ StructField("id", IntegerType(), True), StructField("timestamp", TimestampType(), True) ])df = Spark.createDataFrame(数据, schema=schema)df.write.mode(“覆盖”).parquet(“path/to/parquet_file”)# 更新代码(Glue 4.0)qspark.conf.set(“spark.sql.legacy.parquet.int96RebaseModeInRead”,“LEGACY”)Spark.conf.set(“spark.sql.legacy.parquet.int96RebaseModeInWrite”,“LEGACY”)数据 = [(1, "1899-12-31 23:59:59"), (2, "1900-01-01 00:00:00")]schema = StructType([ StructField("id", IntegerType(), True), StructField("timestamp", TimestampType(), True) ])df = Spark.createDataFrame(数据, schema=schema)df.write.mode(“覆盖”).parquet(“path/to/parquet_file”)
在您的环境中自动验证
确定必要的更改后,Spark Upgrades 通过在您的 AWS 账户中将升级的应用程序作为 AWS Glue 作业运行来验证升级的应用程序。该服务会迭代多次验证运行(最多 10 次),检查每次迭代中遇到的任何错误并完善升级计划,直到成功运行。您可以使用通过用于验证运行的 Glue 作业参数提供的模拟数据集在开发帐户中运行 Spark 升级分析。
Spark Upgrades 成功验证更改后,会提供升级计划供您查看。然后,您可以接受更改并将其应用到开发帐户中的作业,然后将其复制到生产帐户中的作业。Spark升级计划包括以下内容:
- 升级摘要以及在此过程中进行的代码更新的说明
- 您可以用来代替当前脚本的最终脚本
- 验证运行的日志显示如何识别和解决问题
在决定将更改应用到生产作业之前,您可以查看升级的所有方面,包括中间验证尝试和任何错误解决方案。这种方法可确保您完全了解和控制升级过程,同时受益于人工智能驱动的自动化。
开始进行生成式 AI Spark 升级
让我们逐步完成将 AWS Glue 2.0 作业升级到 AWS Glue 4.0 的过程。完成以下步骤:
- 在 AWS Glue 控制台上,选择ETL职位在导航窗格中。
- 选择您的 AWS Glue 2.0 作业,然后选择使用 AI 运行升级分析。
- 为了结果 小路, 进入
s3://aws-glue-assets-<账户 ID>-<区域>/scripts/upgraded/
(提供您自己的账户 ID 和 AWS 区域)。 - 选择跑步。
- 上升级分析选项卡,等待分析完成。
分析运行时,您可以在以下位置查看中间作业分析尝试(最多 10 次)以进行验证跑步选项卡。此外,S3 升级总结记录迄今为止 Spark Upgrade 服务所做的升级,并通过每次尝试完善升级计划。每次尝试都会显示不同的失败原因,服务会尝试通过代码或配置更新在后续尝试中解决该问题。
成功分析后,升级后的脚本和更改摘要将上传到亚马逊简单存储服务(亚马逊 S3)。 - 查看更改以确保它们满足您的要求,然后选择应用升级后的脚本。
您的作业现已成功升级到 AWS Glue 版本 4.0。您可以检查脚本选项卡以验证更新的脚本和职位详情选项卡以查看修改后的配置。
通过示例了解升级过程
我们现在展示一个生产 Glue 2.0 作业,我们希望使用 Spark Upgrade 功能将其升级到 Glue 4.0。此 Glue 2.0 作业读取不同分区下的 S3 存储桶中每日更新的数据集,其中包含来自在线市场的新书评,并运行 SparkSQL 来收集有关用户对书评投票的见解。
原始代码 (Glue 2.0) — 升级前
导入系统从 awsglue.transforms 导入 *从 awsglue.utils 导入 getResolvedOptions从 pyspark.context 导入 SparkContext从 awsglue.context 导入 GlueContext从 awsglue.job 导入作业sc = SparkContext.getOrCreate()glueContext = GlueContext(sc)火花=glueContext.spark_session工作 = 工作(glueContext)从集合导入序列从 pyspark.sql.types 导入 DecimalType从 pyspark.sql.functions 导入 lit, to_timestamp, coldef is_data_type_sequence(coming_dict):如果 isinstance(coming_dict, Sequence) 返回 True,否则返回 Falsedef dataframe_to_dict_list(df):返回 [row.asDict() for row in df.collect()]书籍输入路径 = (“s3://aws-bigdata-blog/ generated_synthetic_reviews/data/product_category=Books/”)view_name =“书籍临时视图”静态日期=“2010-01-01”书籍来源_df = (Spark.read.option("标题", "true").option("recursiveFileLookup", "true").option("路径", books_input_path).镶木地板(书籍输入路径))books_source_df.createOrReplaceTempView(view_name)books_with_new_review_dates_df = Spark.sql(f"""选择{视图名称}.*,DATE_ADD(to_date(review_date), "180.8") AS next_review_date,案件当 DATE_ADD(to_date(review_date), "365") < to_date('{static_date}') 那么 '是'否则“不”END AS 可操作来自{视图名称}”“”)books_with_new_review_dates_df.createOrReplaceTempView(view_name)aggregate_books_by_marketplace_df = Spark.sql(f“选择市场,计数({view_name}.*)作为total_count,avg(star_ rating)作为average_star_ ratings,avg(helpful_votes)作为average_helpful_votes,avg(total_votes)作为average_total_votes来自{view_name}组(按市场)”)aggregate_books_by_marketplace_df.show()数据 = dataframe_to_dict_list(aggregate_books_by_marketplace_df)如果 is_data_type_sequence(数据):print("数据有效")别的:引发 ValueError("数据无效")aggregate_target_books_df =aggregate_books_by_marketplace_df.withColumn("average_total_votes_decimal", col("average_total_votes").cast(DecimalType(3, -2)))aggregate_target_books_df.show()
新代码 (Glue 4.0) — 升级后
导入系统从 awsglue.transforms 导入 *从 awsglue.utils 导入 getResolvedOptions从 pyspark.context 导入 SparkContext从 awsglue.context 导入 GlueContext从 awsglue.job 导入作业from collections.abc import 序列从 pyspark.sql.types 导入 DecimalType从 pyspark.sql.functions 导入 lit, to_timestamp, colsc = SparkContext.getOrCreate()glueContext = GlueContext(sc)火花=glueContext.spark_sessionSpark.conf.set("spark.sql.adaptive.enabled", "false")Spark.conf.set("spark.sql.legacy.allowStarWithSingleTableIdentifierInCount", "true")Spark.conf.set("spark.sql.legacy.allowNegativeScaleOfDecimal", "true")工作 = 工作(glueContext)def is_data_type_sequence(coming_dict):如果 isinstance(coming_dict, Sequence) 返回 True,否则返回 Falsedef dataframe_to_dict_list(df):返回 [row.asDict() for row in df.collect()]书籍输入路径 = (“s3://aws-bigdata-blog/ generated_synthetic_reviews/data/product_category=Books/”)view_name =“书籍临时视图”静态日期=“2010-01-01”书籍来源_df = (Spark.read.option("标题", "true").option("recursiveFileLookup", "true").load(书籍输入路径))books_source_df.createOrReplaceTempView(view_name)books_with_new_review_dates_df = Spark.sql(f"""选择{视图名称}.*,DATE_ADD(to_date(review_date), 180) AS next_review_date,案件当 DATE_ADD(to_date(review_date), 365) < to_date('{static_date}') 那么 '是'否则“不”END AS 可操作来自{视图名称}”“”)books_with_new_review_dates_df.createOrReplaceTempView(view_name)aggregate_books_by_marketplace_df = Spark.sql(f“选择市场,计数({view_name}.*)作为total_count,avg(star_ rating)作为average_star_ ratings,avg(helpful_votes)作为average_helpful_votes,avg(total_votes)作为average_total_votes来自{view_name}组(按市场)”)aggregate_books_by_marketplace_df.show()数据 = dataframe_to_dict_list(aggregate_books_by_marketplace_df)如果 is_data_type_sequence(数据):print("数据有效")别的:引发 ValueError("数据无效")aggregate_target_books_df =aggregate_books_by_marketplace_df.withColumn("average_total_votes_decimal", col("average_total_votes").cast(DecimalType(3, -2)))aggregate_target_books_df.show()
升级总结
在 Spark 3.2 中,spark.sql.adaptive.enabled 默认启用。要恢复 Spark 3.2 之前的行为,您可以将spark.sql.adaptive.enabled设置为false。对于此特定错误,在提供的上下文中找不到合适的迁移规则。该更改是根据错误消息进行的,该消息表明无法从集合模块导入序列。在Python 3.10中,Sequence已移至collections.abc模块。在 Spark 3.1 中,当使用路径参数调用以下方法时,路径选项不能共存:DataFrameReader.load()、DataFrameWriter.save()、DataStreamReader.load() 或 DataStreamWriter.start()。此外,DataFrameReader.load() 的路径选项不能共存。例如,spark.read.format(csv).option(path, /tmp).load(/tmp2)或spark.read.option(path, /tmp).csv(/tmp2)将抛出org.apache.spark.sql.AnalysisException。在Spark 3.0及以下版本中,如果向上述方法传递一个路径参数,则路径选项将被覆盖;如果将多个路径参数传递给 DataFrameReader.load(),则路径选项将添加到整体路径中。要恢复Spark 3.1之前的行为,可以将spark.sql.legacy.pathOptionBehavior.enabled设置为true。在 Spark 3.0 中,“date_add”和“date_sub”函数仅接受 int、smallint、tinyint 作为第二个参数;小数和非文字字符串不再有效,例如:`date_add(cast('1964-05-23' as date), '12.34')`会导致`AnalysisException`。请注意,仍然允许使用字符串文字,但如果字符串内容不是有效的整数,Spark 将抛出 AnalysisException。在 Spark 2.4 及更低版本中,如果第二个参数是小数或字符串值,则它会被强制转换为 int 值,结果是日期值“1964-06-04”。在 Spark 3.2 中,count(tblName.*) 的使用被阻止以避免产生不明确的结果。因为如果存在空值,count(*) 和 count(tblName.*) 的输出会有所不同。要恢复 Spark 3.2 之前的行为,可以将spark.sql.legacy.allowStarWithSingleTableIdentifierInCount 设置为true。在Spark 3.0中,默认不允许使用负数小数位数,例如1E10BD这样的字面量的数据类型是DecimalType(11, 0)。在 Spark 2.4 及以下版本中,它是 DecimalType(2, -9)。要恢复Spark 3.0之前的行为,可以将spark.sql.legacy.allowNegativeScaleOfDecimal设置为true。
从更新后的 Glue 4.0 (Spark 3.3.0) 脚本与 Glue 2.0 (Spark 2.4.3) 脚本的比较以及生成的升级摘要中可以看出,在六次尝试中总共应用了六种不同的代码和配置更新。Spark升级分析。
- 尝试 #1 包括Spark SQL 配置(
Spark.sql.adaptive.启用
)来恢复应用程序行为,这是从 Spark 3.2 开始引入的 Spark SQL 自适应查询执行的新功能。用户可以检查此配置更改,并可以根据自己的喜好进一步启用或禁用它。 - 尝试 #2 解决了Python语言变化Python 3.7 和 3.10 之间引入了新的抽象基类 (
ABC
)在Python集合模块下用于导入顺序
。 - 尝试 #3 解决了由于行为更改而遇到的错误数据帧API从 Spark 3.1 开始
小路
选项不能与其他选项共存数据帧读取器
运营。 - 尝试 #4 解决了由更改引起的错误Spark SQL 函数 API签名为
日期_添加
从 Spark 3.0 开始,它现在只接受整数作为第二个参数。 - 尝试 #5 解决了由于行为更改而遇到的错误Spark SQL 函数 API为了
计数(表名.*)
从 Spark 3.2 开始。随着新方法的引入,这种行为得到了恢复Spark SQL 配置Spark.sql.legacy.allowStarWithSingleTableIdentifierInCount
- 尝试#6 成功完成分析并在 Glue 4.0 上运行新脚本,没有出现任何新错误。最后的尝试解决了由于禁止使用负标度而遇到的错误
强制转换(DecimalType(3,-6)
在Spark 数据帧 API启动 Spark 3.0。该问题已通过启用新功能得到解决Spark SQL 配置Spark.sql.legacy.allowNegativeScaleOfDecimal。
预览的重要注意事项
当您在预览期间开始使用自动 Spark 升级时,为了优化该服务的使用,需要考虑几个重要方面:
服务范围及限制
- – 预览版本重点关注 PySpark 代码从 AWS Glue 版本 2.0 到版本 4.0 的升级。在撰写本文时,该服务处理不依赖于其他库依赖项的 PySpark 代码。您可以在一个 AWS 账户中同时运行最多 10 个作业的自动升级,从而使您能够高效地实现多个作业的现代化,同时保持系统稳定性。
- 优化升级过程中的成本– 由于该服务使用生成式 AI 通过多次迭代来验证升级计划,并且每次迭代都作为 AWS Glue 作业在您的账户中运行,因此优化验证作业运行配置以实现成本效益至关重要。为此,我们建议在开始升级分析时指定运行配置,如下所示:
- 使用非生产开发人员帐户并选择代表生产数据但尺寸较小的示例模拟数据集,以便通过 Spark 升级进行验证。
- 使用适当大小的计算资源(例如 G.1X 工作线程),并选择适当数量的工作线程来处理示例数据。
- 启用胶水自动缩放适用时根据工作负载自动调整资源。
例如,如果您的生产作业使用 20 个 G.2X 工作线程处理 TB 级的数据,您可以将升级作业配置为使用 2 个 G.2X 工作线程处理几 GB 的代表性数据,并启用自动缩放以进行验证。
- 预览最佳实践– 在预览期间,我们强烈建议您从非生产作业开始升级之旅。通过这种方法,您可以熟悉升级工作流程,并了解该服务如何处理不同类型的 Spark 代码模式。
您的经验和反馈对于帮助我们增强和改进此功能至关重要。我们鼓励您通过 AWS Support 或您的客户团队分享您的见解、建议和遇到的任何挑战。此反馈将帮助我们改进服务并添加预览期间对您最重要的功能。
结论
本文演示了自动化 Spark 升级如何帮助您在 AWS Glue 中迁移 Spark 应用程序。它使用生成式 AI 自动识别不同 Spark 版本之间必要的脚本更改,从而简化了迁移过程。
要了解有关 AWS Glue 中此功能的更多信息,请参阅AWS Glue 中 Apache Spark 的生成式 AI 升级。
特别感谢为在 AWS Glue 中启动 Apache Spark 生成式 AI 升级做出贡献的所有人:Shuai Zhang、Mukul Prasad、Liyuan Lin、Rishabh Nair、Raghavendhar Thiruvoipadi Vidyasagar、Tina Shao、Chris Kha、Neha Poonia、Xiaoxi Liu、JapsonJeyasekaran、Suthan Phillips、Raja Jaya Chandra Mannem、Yu-Ting Su、NeilJonkers、Boyko Radulov、Sujatha Rudra、Mohammad Sabeel、Mingmei Yang、Matt Su、Daniel Greenberg、Charlie Sim、McCall Petier、Adam Rohrscheib、Andrew King、Ranu Shah、Aleksei Ivanov、Bernie Wang、Karthik Seshadri、Sriram Ramarathnam、Asterios Katsifodimos、布罗迪·鲍曼、桑尼·科诺普列夫、Bijay Bisht、Saroj Yadav、卡洛斯·奥罗斯科、尼廷·巴哈杜尔、金舒克·帕哈雷、桑托什·钱德拉胡德和威廉·万贝内佩。
关于作者
关山则隆是 AWS Glue 团队的首席大数据架构师。他负责构建软件工件来帮助客户。在业余时间,他喜欢骑着他的新公路自行车。
克尔蒂·查达拉瓦达是 AWS Glue 的高级软件开发工程师,专注于将生成式 AI 和数据集成技术结合起来,设计和构建满足客户数据和分析需求的全面解决方案。
舒巴姆·梅塔是 AWS Analytics 的高级产品经理。他领导 AWS Glue、Amazon EMR 和 Amazon MWAA 等服务的生成式 AI 功能开发,使用 AI/ML 来简化和增强数据从业者在 AWS 上构建数据应用程序的体验。
普拉迪普·帕特尔是 AWS Glue 团队的软件开发经理。他热衷于利用 AWS 云的强大功能来帮助客户解决问题,提供高度可扩展且强大的解决方案。业余时间,他喜欢徒步旅行和玩网络应用程序。
刘楚涵是 AWS Glue 的软件工程师。他热衷于构建用于大数据处理、分析和管理的可扩展分布式系统。他还热衷于利用生成式人工智能技术为客户提供全新的体验。业余时间,他喜欢运动,喜欢打网球。
瓦伊巴夫·奈克是 AWS Glue 的软件工程师,热衷于构建强大、可扩展的解决方案来解决复杂的客户问题。他对生成式人工智能有着浓厚的兴趣,喜欢探索创新方法来开发利用尖端人工智能技术力量的企业级解决方案。
莫希特·萨克塞纳是 AWS Glue 和 Amazon EMR 团队的高级软件开发经理。他的团队专注于构建分布式系统,使客户能够通过简单易用的界面和 AI 驱动的功能,跨 Amazon S3 上的数据湖以及云上的数据库和数据仓库高效地转换 PB 级数据。