spark文件读取分区参数设置方法

简单记录一个spark的分区参数如何配置的问题。

最近发现一个案例,线上EMR的spark在相同sql和输入情况比另一个集群的spark on yarn要快很多,在排除其他额外因素后,发现EMR的任务在开始读取文件的阶段任务数远小于另一个。在读取阶段一般不需要过多task,划分过多的task反倒会浪费大量时间在网络IO上。因此,需要合理配置分区参数.
影响分区设置的参数有很多,我们常见会配置的分区参数诸如 spark.sql.files.maxPartitionBytesspark.default.parallelism,但事实上还不止。通过查看源码我们才可以清晰知道各种参数的存在意义和设置方法:
源码位置在org.apache.spark.sql.execution.datasources.FilePartition#maxSplitBytes

1
2
3
4
5
6
7
8
9
10
11
12
13
// 注意该方法只适用ocr等文件读取的情况
def maxSplitBytes(
sparkSession: SparkSession,
selectedPartitions: Seq[PartitionDirectory]): Long = {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
.getOrElse(sparkSession.leafNodeDefaultParallelism)
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / minPartitionNum

Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}

这里涉及到四个参数

filesMaxPartitionBytes

对应参数 spark.sql.files.maxPartitionBytes

The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

该参数指定单分区装入的最大字节数

filesOpenCostInBytes

对应参数spark.sql.files.openCostInBytes

The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It’s better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first). This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

该参数指定打开文件时预估的字节数,设置合适的大小可以提高打开文件的速度

filesMinPartitionNum

对应参数spark.sql.files.minPartitionNum

The suggested (not guaranteed) minimum number of split file partitions. If not set, the default value is spark.default.parallelism. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

该参数指定文件最小的并行度,用来获取每个分区可以读取的最大字节数

leafNodeDefaultParallelism

对应参数 spark.sql.leafNodeDefaultParallelism

The default parallelism of Spark SQL leaf nodes that produce data, such as the file scan node, the local data scan node, the range node, etc. The default value of this config is ‘SparkContext#defaultParallelism’.

对于大部分情况,这个参数其实就等于 spark.default.parallelism

分享到: