浅谈MaxCompute SQL调优


1.写在前面

​ 今天跟大家分享的内容是maxcompute的优化内容,希望能够起到抛砖引玉的作用。我将从三个方面开展这次的主题分享,第一个部分是MC和hive的对比;第二部分是调优方法论,主要是想总结一下我们从代码提交到执行结束,都有哪些位置可能会出现问题,一般情况下又是怎么解决的;最后是结合具体案例,对长尾问题的一些思考。

目录

2. MaxCompute对比Hive

​ 首先,先对其简单做个介绍,MaxCompute主要是应用在批量结构化数据的存储和计算,主要是数据仓库的和大数据的分析建模方向。这个是MaxCompute系统架构,可以看到MaxCompute支持SQL查询计算,MapReduce计算,支持机器学习,和面向迭代的图计算,对外提供三种Java API接口,非常简洁。MaxCompute和Hive一样可以使用SQL、UDF以及MapReduce开发。基于MaxCompute的无服务器的设计思路,用户只需关心业务和数据,不需要关心底层分布式架构及运维。

MaxCompute介绍

​ 对于MaxCompute,可将飞天视为类似于Hadoop的框架,如Hive基于Hadoop,那么可以理解为MaxCompute基于飞天系统。大家都知道hive依赖hdfs进行数据存储,依赖yarn进行资源管理,正好也对应着盘古和伏羲。为什么阿里要去hadoop,做一个自己的系统,我的看法是hadoop开源更新不可控,无法满足快速迭代的业务需求,不如另起炉灶,但是底层架构有很多相似之处。

​ Maxcompute和hive都是有自己的客户端的,另外maxcompute对hive sql也是有兼容的,只需要一行设置即可。下面是他们的数据源和数据格式,阿里肯定支持自家的ots和oss,均支持hdfs。Maxcompute支持手动和自动的调优,hive仅仅支持手动调优。

MaxCompute对比Hive

​ 找到了一个2016年的数据,100TB数据全排序,maxcompute比竞品要快要便宜。

MaxCompute性能试验

3.调优方法论

​ 我们优化一个sql的前提是这个代码能够成功运行,需要知道提交一个任务都需要经历哪几个阶段。一个sql一般我们叫做一个odps作业,在我们提交一个odps作业之后,都会发生哪些动作呢?我们将整个sql的运行分为预处理、编译、执行和结束这四个阶段。

​ 以上四个阶段只有预处理阶段没有logview产生,而且预处理阶段一般不会出现问题,这个阶段我们不做考虑,其他三个过程,又会因哪些原因发生阻塞呢,根据细分规则,我们接下来看如果我们的作业在这些位置被卡住,将会有哪些表现,并且我们分析是哪些原因导致的,以及可以采取哪些解决方案。

SQL执行过程分析

3.1 编译阶段

​ 首先,编译阶段指的是什么时候呢?指的是已经了产生logview,但还没有执行计划。根据 logview 的子状态(SubStatusHistory)可以进一步细分为调度、优化、生成物理执行计划、数据跨集群复制等子阶段。编译阶段的问题主要表现为在某个子阶段卡住,即作业长时间停留在某一个子阶段。下面将介绍作业停留在每个子阶段的表现以及可能原因和解决方法。

编译阶段

3.1.1 调度阶段

【特征】子状态为“Waiting for cluster resource”,作业排队等待被编译。

【该阶段作业卡住的可能原因 1 】计算集群资源紧缺。

【解决方法】查看计算集群的状态,需要等待计算集群的资源。

【该阶段作业卡住的可能原因 2 】编译资源池资源不够:可能有人不小心用脚本一次提交太多作业,把编译资源池占满了。如果发现这种情况,要及时找值班同学咨询。

3.1.2 优化阶段

【特征】子状态为“SQLTask is optimizing query”,优化器正在优化执行计划。

【该阶段作业卡住的可能原因】如果执行计划复杂,需要等待较长时间做优化。

【解决方法】一般可接受的时间是10分钟以内,如果真的太长时间不退出,基本上可以认为是 odps 的 bug

3.1.3 生成物理执行计划阶段

【特征】子状态为“SQLTask is generating execution plan”。

【该阶段作业卡住的可能原因 1 】读取的分区太多。每个分区需要去根据分区信息来决定处理方式,决定 split,并且会写到生成的执行计划中。

【解决方法】需要好好设计 SQL,减少分区的数量,包括:分区裁剪、筛除不需要读的分区、把大作业拆成小作业。

【该阶段作业卡住的可能原因 2 】小文件太多。ODPS 会根据文件大小决定 split,小文件多了会导致计算 split 的过程耗时增加。

【解决方法】避免小文件,可以执行一次 alter table merge smallfiles; 让 odps 把小文件 merge 起来

【注意】上面提到的“分区太多,小文件太多”不是指几十、几百个。基本都是要上万,上十万才会对生成物理执行计划的时间产生较大影响。

3.1.4 数据跨集群复制阶段

【特征】子状态列表里面出现多次“Task rerun”,result 里有错误信息“FAILED: ODPS-0110141:Data version exception”。作业看似失败了,实际还在执行,说明作业正在做数据的跨集群复制。

【该阶段作业卡住的可能原因 1 】project 刚做集群迁移,往往前一两天有大量需要跨集群复制的作业。

【解决方法】这种情况是预期中的跨集群复制,需要用户等待。

【该阶段作业卡住的可能原因 2 】可能是作业提交错集群,或者是中间 project 做过迁移,分区过滤没做好,读取了一些比较老的分区。

【解决方法】检查作业提交的集群是否正确。过滤掉不必要读取的老分区。

编译阶段

3.2 执行阶段

​ 执行阶段一般指的是:logview 的 detail 界面有执行计划(执行计划没有全都绿掉),且作业状态还是 Running。执行阶段卡住或执行时间比预期长的主要原因有等待资源,数据倾斜,UDF 执行低效,数据膨胀等等,下面将具体介绍每种情况的特征和解决思路。

执行阶段

3.2.1 等待资源

【特征】instance 处于 Ready 状态,或部分 instance 是 Running,部分是 Ready 状态。

【解决思路】确定排队状态是否正常。可以通过 logview 的排队信息“Queue”看作业在队列的位置,作业有排队是正常的。作业的调度顺序不仅与作业提交时间、优先级有关,还和作业所需内存或CPU资源大小能否被满足有关,因此合理设置作业的参数很重要。

3.2.2 数据倾斜

【特征】task 中大多数 instance 都已经结束了,但是有某几个 instance 却迟迟不结束(长尾),这些 instance 运行的慢,可能是因为处理的数据多。

【解决方法】需要找到造成数据倾斜的具体位置,对症下药,利用 Logveiw2.0 查看任务执行图和 instance 运行情况来定位长尾实例。在确定造成数据倾斜的实例、数据来源等信息后,用户需要针对性的对代码甚至算法做一定的修改。

3.2.3 UDF执行低效

​ 这里的 UDF 泛指各种用户自定义的扩展,包括UDF,UDAF,UDTF,UDJ,UDT等。

【特征】某个 task 执行效率低,且该 task 中有用户自定义的扩展。甚至是 UDF 的执行超时报错。

【排查方法】任务报错时,可以在 MaxCompute Studio 中快速通过 DAG 图判断报错的 task 中是否包含 UDF。此外,在 task 的 stdout 日志里,UDF 框架会打印 UDF 输入的记录数、输出记录数、以及处理时间,一般来讲,正常情况 Speed(records/s) 在百万或者十万级别,如果降到万级别,那么基本上就有性能问题了。

【解决思路】

  • 检查 UDF 是否有 bug,有时候 bug 是由于某些特定的数据值引起的,比如出现某个值的时候会引起死循环。
  • 检查 UDF 函数是否与内置函数同名。内置函数是有可能被同名 UDF 覆盖的。有相似功能的内置函数的情况下,尽可能不要使用 UDF。内置函数一般经过验证,实现比较高效,并且内置函数对优化器而言是白盒,能够做更多的优化。
  • UDF效率低的原因也可能是内存原因,某些UDF在内存计算、排序的数据量比较大时,会报内存溢出错误;内存不足也会导致gc频率过高。可以尝试调整内存参数解决内存不足的问题。

3.2.4 数据膨胀

【特征】task 的输出数据量比输入数据量大很多。比如 1G 的数据经过处理,变成了 1T,在一个 instance 下处理 1T 的数据,运行效率肯定会大大降低。输入输出数据量体现在 Task 的 I/O Record 和 I/O Bytes 这两项。

【解决思路】

  • 检查代码是否有 bug;
  • JOIN 条件是不是写错了,变成笛卡尔积了;
  • 检查 Aggregation聚合 引起的数据膨胀。因为大多数 aggregator聚合 是 recursive递归 的,中间结果先做了一层 merge,中间结果不大,而且大多数 aggregator 的计算复杂度比较低,即使数据量不小,也能较快完成。所以通常情况下这些操作问题不大,但是select 中使用 aggregation 按照不同维度做 distinct,每一次 distinct 都会使数据做一次膨胀;使用 Grouping Sets (CUBE and ROLLUP) ,中间数据可能会扩展很多倍。有些操作如 collect_list、median 操作需要把全量中间数据都保留下来,可能会产生问题。
  • 避免join引起的数据膨胀。比如:两个表 join,左表是人口数据,数据量很大,但是由于并行度足够,效率可观。右表是个维表,记录每种性别对应的一些信息(比如每种性别可能的坏毛病),虽然只有两种性别,但是每种都包含数百行。那么如果直接按照性别来 join,可能会让左表膨胀数百倍。要解决这个问题,可以考虑先将右表的行做聚合,变成两行数据,这样 join 的结果就不会膨胀了。

执行阶段

3.3 结束阶段

​ 最后是结束阶段:大部分 SQL 作业在 Fuxi 作业结束后即停止。但有时 Fuxi 作业结束时,作业总体进度仍然处于运行状态。如下图中的 logview,右侧 Job Details 页面显示 Fuxi 作业所有阶段为 Terminated,但左侧代表作业整体进度的 Status 仍然显示 Running,造成这种现象的情况一般分为两种:一个 SQL 作业可能包含多个 Fuxi 作业,比如子查询多阶段执行,作业输出小文件过多导致的自动合并作业;Fuxi 作业结束后,SQL 在结束阶段运行于控制集群的逻辑占用时间较长,比如更新动态分区的元数据。

​ 下面将举例介绍几种典型。

结束阶段

3.3.1 子查询多阶段执行

​ 大部分情况下,MaxCompute SQL 的子查询会被编译进同一个 Fuxi DAG,即所有子查询和主查询都通过一个 Fuxi 作业完成。但也有一些特殊子查询需要先将子查询单独执行。

SELECT product, sum(price) FROM sales WHERE ds in (SELECT DISTINCT ds FROM t_ds_set) GROUP BY product;

​ 子查询 SELECT DISTINCT ds FROM t_ds_set 先执行,其结果需要被用来做分区裁剪,来优化主查询需要读取的分区数。这两次运行就会产生两个 fuxi 作业。

3.3.2 过多小文件

​ 小文件主要带来存储和计算两方面问题。存储方面,小文件过多会给 Pangu 文件系统带来一定的压力,且影响空间的有效利用;计算方面,ODPS 处理单个大文件比处理多个小文件更有效率,小文件过多会影响整体的计算执行性能。因此,为了避免系统产生过多小文件,SQL 作业在结束时会针对一定条件自动触发合并小文件的操作。小文件的判定,是通过参数 odps.merge.smallfile.filesize.threshold 来设置的,该配置默认为 32MB,小于这个阈值的文件都会被判定为小文件。

​ 自动合并小文件多出来的 merge task,虽然会增加当前作业整体执行时间,但是会让结果表在合并后产生的文件数和文件大小更合理,从而避免对文件系统产生过大压力,也使得表被后续的作业使用时,拥有更好的读取性能。如果结果实际不大,但是文件数过多,那么最好是先检查下小文件阈值配置是否正确。

3.3.3 动态分区元数据更新

​ Fuxi 作业执行完后,有可能还有一些元数据操作。比如要把结果数据挪到特定目录去,然后把表的元数据更新。有可能动态分区输出了太多分区,那么还是可能会消耗一定的时间的。

​ Fuxi 作业执行结束后,仍需要一段时间进行表的元数据更新。

结束阶段

4.计算长尾案例分享

​ 最后,我们来讨论一些长尾的案例。长尾问题是分布式计算里最常见的问题之一,也是典型的疑难杂症。究其原因,是因为数据分布不均,导致各个节点的工作量不同,整个任务就需要等最慢的节点完成才能完成。我们从长尾现象和原因入手,针对实际业务处理过程中,遇到的长尾现象,分享给大家优化思路和解决方案。

​ 正式进入这个主题之前,我们需要知道几个概念。我们提交到maxcompute上的任务都是由1个到n个fuxi job组成的。图中的任务就是由3个fuxi job来完成的,每一个fuxi job是由1个到n个fuxi task组成的。一个fuxi task是由1个到n个fuxi instance完成。复习了这些概念之后,我们进入下面的主题。

Fuxi概念区分

​ 我们如何查看任务是否发生了长尾?长尾的现象是什么?logview是帮助我们排查这些问题重要的工具和手段,我们一般会逐一点击task,页面下端会显示这个task任务的所有instance的运行情况,有运行时间,起止时间,如果看到红框内有long tails说明发生了长尾。系统是怎么认定发生了长尾呢?是因为这个任务执行完成之后,系统会对每个instance时间做统计,求出一个平均值,一般来说如果某些instance是这个平均时间的两倍以上,就认定发生了长尾现象(logview1.0是三倍),logview2.0异常情况区分了长尾和数据倾斜,两个异常的关注点是不一样,数据倾斜关注的是数据的IO数据量大小,长尾关注的instance的执行时间。

长尾现象

​ 我们看一下为什么会发生长尾呢?我们举一个例子,这个例子就是简单的wordcount实际场景的例子。这个场景下会用到两个task,一个map,一个reduce。我们看一下整个wordcount的流程。

wordCount过程

​ 整个wordcount的流程,输入的数据在起始端的时候会被分片,分片的数据进入map instance以后,map instance会去读每一条记录,然后把key放在第一个参数位置,value会记录上1,下一个阶段是shuffer阶段,shuffer阶段的前一个部分会做一个排序,排序之后还会做一个combiner的操作,combiner阶段会把相同的key的value累加起来,组成一个新的key-value值,在shuffer的后半段,相同key的记录会进入相同的reduce instance上去,reduce阶段会再做一个combiner操作,最后输出数据。我们看一下哪些阶段容易发生长尾问题,首先是map阶段,map阶段有可能发生长尾,但是在实际应用中,这种现象不经常发生,如果发生了,大家可以一块跟踪一下整个问题。下一个容易发生长尾的地方是shuffer dispatch的地方,就是shuffer后期分配reduce的阶段。相同的key会分配到相同的reduce上去处理,如果一个key的数据非常非常多,那这个reduce真的是非常幸运了,他完成的时间可能会比其他的instance要长,这个时候长尾现象就发生了。

长尾原因

4.1 group by长尾问题

​ 我们接着看一下我们遇到的长尾的场景,以及怎么做优化?第一个场景就是group by ,比如我们app中某一个商品卖得很好,我们想要计算这个商品的pu,pv/uv,往往会做一个group by的操作。和我们刚才的wordcount例子一样,往往会是一个map+reduce,发生长尾的地方往往是shuffer dispatch的地方。所以如果发生了长尾,我们应该怎么解决呢?我们思考能不能把reduce这个hot key打散,不让他分发到同一个reduce instance。

Group By长尾问题

​ 系统提供了一个参数,这个参数会在shuffer dispatch的时候,算法里面会加入一个随机的因素,会更加均匀的将数据分发到reduce上。大家可以看到对比,在这个参数开启前,dag图是一个map+reduce,开启之后是一个map+两个reduce。这跟我们刚才的想法是一致的,中间的reduce的作用就是将hot key给打散。但是需要注意的是,这个参数仅仅对group by有效,如果长尾现象并不严重,用这种方法增加一次reduce,时间消耗反而更大。

解决方案

4.2 Count Distinct长尾问题

​ 再看另一个场景,count distinct长尾。当我们计算商品购买uv的时候,固定的特殊值比较多,也会发生长尾现象。我们可以不用distinct,用group by来改写。但是我们改写之后由26s变成了33秒。我们初衷是避免使用distinct,使用group by,并且开启了参数。这个实验充分说明如果长尾不是很严重,是否开启这个参数是值得商榷的。那还能怎么做呢?

Count Distinct长尾问题

​ 我们可以使用where 先把这个特殊值(比如null)给过滤掉,我们count完了之后再加1就好了,所以这些优化是需要根据用户具体的数据结构、类型规模来决定。

解决方案

4.3 动态分区长尾问题

​ 下一个场景是动态分区,在动态分区的场景里面,非常有可能发生长尾。上面是一个动态分区写入的一个语句,假如我们有n个map instance,我们的目标分区有m个,可能会产生n*m个小文件。大家都知道小文件过多会给我们后续查询带来很多的问题,所以系统针对这种情况是做了优化的,会尽可能避免产生过多的小文件。我们看一下执行计划,下面红框中有两个值,第一个是我们的分区,第二个值具体是0-10之间的一个数,系统为了避免小文件产生,做了一个reshuffer。reshuffer具体做了什么呢?会在shuffer dispatch的时候,相同目标分区的数据,会由最多10个reduce来处理,实际上减少了reduce的个数,也减少了小文件的产生。

动态分区长尾问题

​ 但是这个任务花费了3分钟,并且发生了长尾现象。我们想既可以避免小文件产生,又不想发生长尾,可以怎么做呢?

长尾问题

​ 手动的把reshuffer开关给关掉(这个开关默认是打开的)。任务在10s左右就跑完了,效果还是很明显的。所以如果目标分区不多,建议先关闭reshuttle;任务执行完毕后,手动执行MergeTask,减少小文件。这个方法比较折中,大家可以尝试一下。

解决方案

4.4 Join长尾问题

​ join长尾,join长尾还是相当普遍的,每个join也会被解释器解释成一个mr任务,map端读取join两边表,reduce做join操作,没有办法避免shuffer dispatch的时候产生的hot key的问题。

Join长尾问题

​ 很多人会建议用mapjoin,mapjoin的原理是将join的操作由reduce阶段移到map端去做,会把小表放到每一个map instance上面,每一个map instance都拥有小表所有的内容,可以在map本地进行join。但是可想而知,如果一个map instance能存下小表所有内容,单机内存要求还是非常高的。所以一般会有这个限制,mapjoin的小表占用的内存总和不得超过512MB。

解决方案一

​ 但是我们实际使用场景中,表的大小超过512,还是非常常见的。很多人会说我两边数据都很大,没有办法使用mapjoin。那应该怎么办呢,我们可以尝试使用分而治之的方法解决。长尾的问题就是热点key太多,发送到一个reduce instance,导致运行时间过长,我们可以把热点key找出来,或者找到一个范围。将热点key和非热点key分开处理。

  • (1)第一步我们可以先做个group by和order by找出热点item,并且把这些信息放到一个临时表中。
  • (2)找到这些热点key之后,可以将热点key先关联,一般来说数据不会很多,我们能用上mapjoin就尽量用上mapjoin,能整个提高第二步骤的执行效率。
  • (3)第三步将非常用item取出关联
  • (4)最后将两个部分union all合并,而且上面很多过程都是可以并行执行,所以分享给大家。

解决方案二


文章作者: 敲代码的乔帮主
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 敲代码的乔帮主 !
 上一篇
大数据学习之路 大数据学习之路
这个系列记录了自己在大数据方向的学习历程,大致包含几个方面,第一部分是linux和高并发基础知识;第二部分是Hadoop体系相关内容,包括hdfs、MapReduce、hive、hbase和ZK等多个知识模块;第三部分是Spark体系相关知识,包括scala、spark-core、spark-sql、spark-stream和storm等知识模块;第四部分是Flink实时计算相关,包括了Flink基础、Flink SQL和Flink CDC等相关知识。
2021-04-22
本篇 
浅谈MaxCompute SQL调优 浅谈MaxCompute SQL调优
今天跟大家分享的内容是maxcompute的优化内容,希望能够起到抛砖引玉的作用。我将从三个方面开展这次的主题分享,第一个部分是max和hive的对比;第二部分是调优方法论,主要是想总结一下我们从代码提交到执行结束,都有哪些位置可能会出现问题,一般情况下又是怎么解决的;最后是结合具体案例,对长尾问题的一些思考。
2021-04-21
  目录