但是最近在测试spark on k8s的时候,遇到了一些性能问题,于是记录一下排查过程,做一下案例的复盘。
案例再现
我们使用的底层集群是AWS的eks集群,在此之上搭建了一套传统的hadoop yarn+ spark环境。通俗来说就是将yarn的resourceManager,nodeManager等具体的组件转化成k8s的pod,但是上层调度逻辑不变,做了一套两层调度的系统。该系统的具体实现方案和调度逻辑这里按下不表,因为这和今天讲述的性能优化案例关系不大,我们只要了解到这是传统的yarnspark,但是底层是k8s就好。
我采用的任务是来自我们生产环境的一个小任务,期望运行时间5min,数据量中等,输出表数据大约在2kw行。原先生产环境采用的是AWS的EMR on EC2集群,可以理解是传统的容器主机集群。现在将该任务迁移到eks集群后,时长达到了43min,于是观察其history ui:
发现其sql执行的阶段耗时与生产环境的耗时基本类似,阶段执行时长只有几分钟,但是整个spark job完成耗时却用了43min,明显在job执行尾部存在耗时问题,导致长尾现象发生。
初步分析
由于采用生产任务一样的sql,读写数据量也完全一致,排除业务逻辑导致的影响;
排查driver日志,发现阶段结束后几乎没有有效日志,此时所有task已经执行完成,观察集群,executor的利用率也非常低。
既然存在耗时,后台一定有耗时的线程在运行,于是查看spark driver的thread dump
找到的真正的原因发现卡在s3的rename操作上
s3上rename操作带来的性能问题
首先谈一下rename。 spark的rename是指在spark提交作业过程中,为了保持数据的一致性,会生成临时文件来读写数据,当task执行完毕,会将临时文件rename为正式文件;当job执行完毕,会把该job的临时目录下的所有文件rename为正式文件
其目录格式大概如图所示
driver会通过FileFormatWriter选择合适的output committer并启动writer job,committer会决定如何提交job和task,提交流程如下图
rename就发生在其中提交job和提交task的环节,具体的rename策略根据committer的策略而定。关于committer的细节等会再提。
为什么s3上rename会有性能问题
AWS 的s3 包括大多数对象存储,目录本身就是一个对象,因此,其目录rename需要经历list-copy-delete 的操作,相对与文件系统例如HDFS简单的rename,其开销自然非常大。在spark运行中,可能会生成非常多的小文件,即使是HDFS,要进行数万计小文件的rename,其性能尚且需要优化,更不要说s3了。
spark的文件提交协议
在谈及如何优化之前,我们先回顾一下与之相关的Spark文件提交过程。从上一张图可以看到,Spark在job 提交过程中,实际上依然是调用的Hadoop的committer来采取具体的commit的策略。而committer要解决的问题,主要有以下几点:
- 处理文件失败重读导致的数据一致性问题
- 保证task推测执行下相同文件多写时的数据正确性
- 提高海量文件读写,合并的效率
目前Hadoo提供的两种文件提交方式,通过mapreduce.fileoutputcommitter.algorithm.version
进行切换
FileOutputCommitter V1
commit过程
- 首先TaskAttempt会将TaskAttempt Data写入一个临时目录:
${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
- 当task data写完,可以提交task后,执行commitTask,将上述目录转移到正式目录:
${target_dir}/_temporary/${appAttempt}/${taskAttempt}
- 当所有task 完成,执行commitJob, 将
${target_dir}/_temporary
下所有文件及目录转移到${target_dir}
正式目录,并在提交完成后当前目录添加标识符_SUCCESS
来表示标识提交成功
数据一致性问题
- 在TaskAttempt写入的阶段,如果发生task写失败需要重试,只需要重写
${taskAttempt}
目录下/_temporary/
下的所有文件就行,可以保留原先正式的Attempt目录 - 如果发生application重试,也可以通过recoverTask直接恢复原先
${appAttempt}
目录下的正式目录文件,直接重命名到当前${appAttempt}
目录下 - 由于存在两次rename,V1实际上是两阶段提交,rename前后数据的一致性都能得到保证,数据不一致的情况只有可能发生在rename的过程中
性能问题
V1的强一致性带来的负面作用就是两次rename的操作在海量文件生成的情景中可能导致耗时问题,尤其是commitJob阶段,由于是Driver单线程串行执行commit,此时如果需要rename大量文件, 其耗时可能会非常长
FileOutputCommitter V2
- 首先TaskAttempt会将TaskAttempt Data写入一个临时目录:
${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
- 当task data写完,可以提交task后,执行commitTask,将上述目录转移到
${target_dir}
. 注意这里是直接移动到正式目录 - 当所有task 完成,由于所有data已经保存在正式目录下,所以commitJob就是单纯添加标识符
_SUCCESS
来表示标识提交成功
数据一致性问题
1.在taskAttempt写入的阶段,如果发生task写失败重试,由于此时可能task data已经被移动到正式目录,因此此时会出现脏数据
2.如果发生application重试,由于之前application已提交的数据已经存在在正式目录,因此无需额外的重命名操作,直接继续进行其他数据的重试即可,当然,此时已提交的数据不一定完全正确,其中可能存在脏数据。
3.可见V2过程牺牲一定的数据一致性,选择了最终一致性的方案,由于缺乏中间过程对数据正确性的保证,所以只能通过最后的_SUCCESS标识符来决定数据是否正确。同时,这还会带来另一个问题,由于存在脏数据,在任务长期运行中,这些脏数据可能无法被正确清理,从而占据存储空间,带来额外的开销
性能问题
V2之所以采取最终一致性的方案,目录就是减少V1 rename操作过多带来的耗时开销。相比V1,V2只需要在task完成后rename到正式目录,而且可以通过task线程的并行操作,其执行的时长会被大大降低
小文件优化
虽然上述的Committer的不同算法在一致性和性能上给了大家选择,但毕竟各有利弊。但在实际场景下,大家的选择总是希望“我全都要”
除了在rename阶段进行优化外,性能杀手的源头:对海量小文件的优化也成为了一个行之有效的方法。
Spark现有的优化:
在spark中内置有对小文件的优化,从文件生成角度:
spark.files.openCostInBytes
利用这个参数设置预估打开文件的大小,设置高一点可以提高小文件分区的速度
从业务侧考虑,大致思路是减少分区数来将小文件合并成大文件
- 使用coalesce或repartition操作合并分区。
- 减少使用动态分区或者使用
distribute by
来控制reducer个数 - 多使用分区表来降低查询时产生的分区数量
- 或者使用更先进的文件压缩格式来提高小文件处理性能
AWS的特殊优化:
由于我们在生产环境中使用了AWS 的EMR,也稍微了解了一下AWS团队在s3上小文件优化上的措施
- multi upload:其原理就是利用并发读写文件片段来提高处理s3读写的性能,基于此,衍生出EMRFS S3-optimized Committer和s3a Committer(开源),注意,该committer默认采用FileOutputCommitter V2方式提交,因此V2存在的问题在这些committer上也都会存在。
- 利用hdfs加速,在EMR中,考虑到文件系统对于rename等操作具有更好的性能,那我们不是可以在文件系统上先rename,再提交到s3上?在EMR中,就是在文件提交到s3前,先上传到类hdfs的文件系统上进行rename或者文件合并操作后再上传到s3上,这样比起纯s3读写,在性能上会有明显收益。当然坏处就是单独维护一个文件系统具有较高成本。
其他优化思路:
我们团队也在小文件合并上进行了优化,其优化思路就是在Job执行的最后,新建一个job用于合并小文件,通过继承Spark的SqlHadoopMapReduceCommitProtocol
来实现插件式的扩展
合并的思路是在commitTask之后,获取数据的分区信息,然后进行分组合并,最后在commitJob的时候直接将合并完的文件转移到正式目录中。其基本思路如下图
这样合并小文件的优点
该功能是插拔式的,对原生代码的侵入性较低
在海量小文件场景下优势明显
缺点
- 新起一个job进行优化,在任务最后会新增两个阶段用于小文件合并,会引入更多的task,带来一定的耗时
尾声
通过启动该功能,我重新跑一遍任务,最终耗时下降明显降低:
当然优化并未完全结束,在eks上的任务耗时总体还是要比原EMR任务高,但这块问题的深入排查,等待下次有时间再分享吧