"); //-->
本文分享自天翼云开发者社区《spark-sql优化简述》,作者:徐****东
1、自适应中reduce参数控制
spark.sql.adaptive.shuffle.targetPostShuffleInputSize用于控制任务Shuffle后的目标输入大小(以字节为单位)。
spark.sql.adaptive.minNumPostShufflePartitions用于控制自适应执行中使用的shuffle后最小的分区数,可用于控制最小并行度。
spark.sql.adaptive.maxNumPostShufflePartitions来控制Shuffle后分区的最大数量。
2、合理设置单partition读取数据量
SET spark.sql.files.maxPartitionBytes=xxxx;
3、合理设置shuffle partition的数量
SET spark.sql.shuffle.partitions=xxxx
4、使用coalesce & repartition调整partition数量
SELECT /*+ COALESCE(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(c) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3, dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(3, dept_col) */ * FROM EMP_TABLE
5、使用broadcast join
6、开启Adaptive Query Execution(Spark 3.0)
6.1、动态合并分区: spark会根据分区的数据量将小数据量的多个分区合并成一个分区,可以提高资源的利用率
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.sql.adaptive.coalescePartitions.enabled: 是否开启动态合并分区
spark.sql.adaptive.coalescePartitions.initialPartitionNum: 初始分区数
spark.sql.adaptive.advisoryPartitionSizeInBytes 合并分区的推荐目标大小
spark.sql.adaptive.coalescePartitions.minPartitionNum: 合并之后的最小分区数
当RDD的分区数处于spark.sql.adaptive.coalescePartitions.initialPartitionNum与spark.sql.adaptive.coalescePartitions.minPartitionNum范围内才会合并
spark.sql.adaptive.advisoryPartitionSizeInBytes: 合并分区之后,分区的数据量的预期大小
6.2、动态切换join策略: 在join的时候,会动态选择性能最高的join策略,提高效率
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.sql.adaptive.localShuffleReader.enabled:在不需要进行shuffle重分区时,尝试使用本地shuffle读取器。将sort-meger join 转换为广播join
6.3、动态申请资源: 当计算过程中资源不足会自动申请资源
spark.sql.adaptive.enabled: 是否开启AQE优化
spark.dynamicAllocation.enabled: 是否开启动态资源申请
spark.dynamicAllocation.shuffleTracking.enabled: 是否开启shuffle状态跟踪
6.4、动态join数据倾斜: join的时候如果出现了数据倾斜,会动态调整分区的数据量,优化数据倾斜导致的性能问题。
spark.sql.adaptive.enabled: 是否开启AQE优化
倾斜的膨胀系数:spark.sql.adaptive.skewJoin.skewedPartitionFactor:N
倾斜的最低阈值:spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes:M
拆分粒度,以字节为单位:spark.sql.adaptive.advisoryPartitionSizeInBytes
G [代表优化之后,分区数数据的预期大小]
sparksql判断出现数据倾斜的依据[需要两个条件同时满足]:
当某个分区处理的数据量>= N * 所有task处理数据量的中位数
当某个分区处理的数据量>= M
7、文件与分区
SET spark.sql.files.maxPartitionBytes=xxx //读取文件的时候一个分区接受多少数据;
spark.sql.files.openCostInBytes//文件打开的开销,通俗理解就是小文件合并的阈值
8、CBO优化
spark.sql.cbo.enabled: 是否开启cbo优化
spark.sql.cbo.joinReorder.enabled: 是否调整多表Join的顺序
spark.sql.cbo.joinReorder.dp.threshold: 设置多表jion的表数量的阈值,一旦join的表数量超过该阈值则不优化多表join的顺序
9、hints优化
hints预防主要用在分区和join上。
Partitioning Hints Types:COALESCE,REPARTITION,REPARTITION_BY_RANGE
Join Hints Types:BROADCAST,MERGE,SHUFFLE_HASH,SHUFFLE_REPLICATE_NL
SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
## Join Hints for broadcast join
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key;
SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key;
-- Join Hints for shuffle sort merge join
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
## Join Hints for shuffle hash join
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
## Join Hints for shuffle-and-replicate nested loop join
SELECT /*+ SHUFFLE_REPLICATE_NL(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
## When different join strategy hints are specified on both sides of a join, Spark
## prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint
## over the SHUFFLE_REPLICATE_NL hint.
## Spark will issue Warning in the following example
## org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge)
## is overridden by another hint and will not take effect.
SELECT /*+ BROADCAST(t1), MERGE(t1, t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
10、缓存表
对于一条SQL语句中可能多次使用到的表,可以对其进行缓存,使用SQLContext.cacheTable(TableName)或者DataFrame.cache即可,SparkSQL会用内存列存储的格式进行表的缓存,然后SparkSQL就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存的使用和GC的开销,SQLContext.uncacheTable(tableName)可以将表从缓存中移除,使用SQLContext.setConf()设置,可以通过
spark.sql.inMemoryColumnarStorage.batchSize
这个参数,默认10000,配置列存储单位。
永久视图 view:永久保存一段查询语句的逻辑,而不是查询语句的数据,永久有效,查询这个视图,相当于查询一个SQL语句,如果保存的查询逻辑复杂,这查询视图也耗时长。支持重新覆盖 create or replace view view1 as
临时视图 temporary view:只在当前会话生效,如果会话结束,则临时视图失效,支持重新覆盖 create or replace temporary view temp_view1 as,类似于 SparkSQL 中的 DataFrame.createOrReplaceTempView('视图名'),hive不支持这个语法
缓存表cache table:只在当前会话有效,将一段查询结果集缓存到内存,并赋予一个表名。
table:永久有效,保存数据结构和数据本身到磁盘。
with as:当子查询的嵌套层数太多时,可以用with as 增加可读性。
11、group by优化
为了提高 group by 查询的性能,可以尝试以下几种方法:
仅选择必要的字段进行 group by 操作,避免选择过多的字段。
尽可能将 group by 字段类型保持一致,以减少数据转换的开销。
如果可能,可以将 group by 字段进行哈希分区,以减少数据传输和处理的开销。
如果使用的是字符串类型,可以考虑使用哈希函数来减少字符串比较的开销。
12、优化倾斜连接
数据偏斜会严重降低联接查询的性能。此功能通过将倾斜的任务拆分(按需复制)为大小大致相等的任务来动态处理排序合并联接中的倾斜。同时启用spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled配置时,此选项才生效。
专栏文章内容及配图由作者撰写发布,仅供工程师学习之用,如有侵权或者其他违规问题,请联系本站处理。 联系我们
相关推荐
丰田将与腾讯在电动汽车人工智能、云计算和大数据方面展开合作
2024年政府工作十大任务发布,大数据、人工智能是重点
IDC预计,2028年中国大数据总体市场规模将超621亿美元
IDC预计,2029年中国大数据总体市场规模将超730亿美元
大数据时代到来,我们电子工程师如何应对?
大数据数据场景下的网络精准规划与优化
关于大数据的基本资料
昆山的朋友,你知道物联网行业的这个消息吗?
用大数据方法协助研发下一代电池电解液
GMIF2024聚焦产业创新之道 共谋存储生态繁荣发展
数据中心的核心:飞思卡尔通信处理技术,助用户“一马当先”
上海:加快智算芯片国产化部署
通讯协议对智能家居的影响究竟有多大?
使用高级查询—SQL语言
数据库语言SQL
基于ARM的非接触IC卡网络系统
晶圆代工厂商牵手RISC-V企业,瞄准低功耗AI芯片
Accessing MS SQL Server CE v1.0v2.0 without the ADO object
工信部:我国新能源汽车发展正带动产业生态全面重塑
车载模块原理分析与电路设计详解
大数据对网络技术和产业的挑战
SQL Server 2000
大数据与可穿戴设备将改变生活
基于大数据与深度学习的穿戴式运动心率算法
大数据:高端安全检测的必由之路