Spark调优的第一步是资源调优: 

    1、搭建Spark集群的时候要给Spark集合足够的资源Core,Memrory

           在 Spark 安装包的conf下的 Spark-env.sh  中配置

                    SPARK_WORKER_CORES   每个Worker的使用的Core数

                    SPARK_WORKER_MEMROEY  每个Worker使用的内存

                    SPARK_WORKER_INSTANCE  每个节点上Worker的个数,默认为1个,每个Worker默认一个Executor

    2、在提交Application的时候给Application分配更多的资源

            ① submit命令提交时指定选项

                    --executor-cores  每个executor使用的core数

                    --executor-memory  每个executor使用的内存大小

                    --total-executor-cores  所有executor总共使用的core数

            ② 修改配置信息 (在Application的代码中设置、spark-submit --conf、或在 Spark-default.xml(.conf) 中设置)

                    spark.executor.cores

                    spark.executor.memory

                    spark.max.cores

            问题

                    30 executor 1core 1G

                    10 executor 3core 3G

                    分情况:    

                            看Worker节点数,如果Worker节点数小于10的话,第二种比较好。

Spark调优的第二步是是任务的并行度调优:

            原则:

                    一个core一般分配 2~3个task,这样当一个core较早地执行完它地task,不用等待其他core执行完,而是继续执行第二个。

                    每一个task一般处理 1G 的数据 (WordCount难度的task),如果是复杂的运算,处理的数据可以减少到500M

            提高并行度的方式

                    1、如果读取的数据在hdfs上,可以增加block块个数,即减小block块大小

                                 block块多,RDD的partition则增加,就提高了并行度,但是这种方法很少用。

                    2、sc.parallelize(list, numPartitions)

                                测试时可以在解析数组时传入分区数。

                    3、sc.textFile(path, numPartitions)

                                读取文件时,可以指定分区数,即并行度

                    4、coalesce、repartition算子提高分区数

                                较常用,如果一个分区数据量很大,可以增加分区来平摊数据。

                    5、在配置信息中指定默认的并行度

                                spark.default.parallelism    默认无值,指定默认并行度。

                                spark.sql.shuffle.partitons   默认200,指定Spark-SQL的shuffle时的并行度,即Reduce task的个数。实际情况下200都过小。

                    6、自定义分区器 partitioner

                                使用 partitionBy() 等算子,传入实现的分区器,实现方法:

                                         numPartition()  在这个方法中指定partition的个数。

                                         getPartition()

Spark调优的第三步是是代码调优:

              1、避免创建重复的RDD,在执行效率上没有区别,但是在代码乱。

              2、在其他Job重复使用的RDD要使用持久化算子

                        cache :  MEMROY_ONLY

                        persist:  持久化级别推荐优先的顺序

                                MEMORY_ONLY

                                MEMORY_ONLY_SER  :对数据序列化

                                MEMORY_AND_DISK_SER   :为什么不使用 MEMORY_AND_DISK,应该这两个都会优先存储在内存中,也就是MEMORY_AND_DISK内存的占用大小与MEMORY_ONLY还是一样的,倒不如将数据序列化压缩一把,采用MEMORY_AND_DISK_SER,这样内存中因为序列化而能存储更多的数据,只是增加计算序列化的性能。

                                尽量不要选用带有 _2  _disk 的持久化级别,有些场景才磁盘读取数据还不如重新计算快。

                        checkpoint:

                                如果这个RDD的 计算时间比较长或者计算起来比较复杂 ,一般将这个RDD的计算结果存储到hdfs上,这样数据会更加安全。

                                如果RDD的 依赖关系非常长,可以 切断这个RDD依赖关系,提高容错效率。

              3、尽量使用广播变量

                        使用广播变量可以大大的降低集群中变量的副本数

                                不使用广播变量        每个task都有一个副本

                                使用广播变量            每个excutor一个副本

                        广播变量最大可以多大?

                                 Executor Memory * 60% * 90% * 80%

                        如何知道广播出去的这个RDD的计算结果占用多大的内存?

                                在WEB UI中查看storage lable查看这个RDD占用内存的大小

                       spark.sql.autoBroadcastJoinThreshold  单位kb

                                 Spark SQL如果某一个表小于这个值 他会自动广播出去

                4、尽量少使用产生shuffle的算子

                         例如当两个rdd进行join,其中一个RDD数据比较少,将从rdd的数据拉取到Driver端,在广播出去,进行map端join等

                         join算子 = 广播变量 + filter\map\flatMap

                5、尽量使用有combiner的shuffle类算子

                        combiner概念:

                               在map端,每一个map task计算完毕以后进行的局部聚合

                        combiner好处:

                                1、降低shuffle拉取数据量,减少网络传输时间

                                3、降低reduce 端聚合的次数

                        reduceByKey:

                                这个算子在map端是有combiner的。

                                在某一些场景下可以使用reduceByKey来代替groupByKey

                        AggregateByKey

                        combineByKey

                6、尽量使用高性能的算子

                        mapPartition 代替 map

                                遍历以partition为单位,需要每个partition的数据量不是太大。

                                逻辑不需要所有key在一个分区时可以替代

                        foreachPartition 代替 foreach

                        filter + coalesce

                                当执行filter过滤完数据后,可能数据量会少很多,这是可以用coalesce减少分区

                        repartition

                                增加分区,提高并行度

                        repartitionAndSortWithinPartitions = repartition + sort(每一个partition内部排序)

                                传入分区器partitioner, 和 比较器comparator

                                进行分区,并且在每个分区内排序

            7、使用Kryo优化序列化性能

                        在Spark中,主要有三个地方涉及到了序列化

                                • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。

                                • 将自定义的类型作为RDD的泛型类型时,所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。

                                • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

                        Kryo序列化器介绍:

                                • Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可以让网络传输的数据变少;在集群中耗费的内存资源大大减少。

                                • 对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。

                       使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

            8、优化数据结构

                        Java中,有三种类型比较耗费内存:

                                • 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。

                                • 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。

                                • 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。

                        因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构。

                                •  尽量使用字符串替代对象。

                                •  使用原始类型(比如Int、Long)替代字符串,

                                •  使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。

            9、使用高性能的库fastutil

                        fastutil介绍:

                                 fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue。

                       fastutil特点:

                               • fastutil能够提供更小的内存占用,更快的存取速度。

                                            我们使用fastutil提供的集合类,来替代自己平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,可以减小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度。

                               • fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的map,实现了Java的Map接口)。

                                            因此可以直接放入已有系统的任何代码中。

                               • fastutil最新版本要求Java 7以及以上版本。

Spark调优的第四步是是Shuffle调优:

                详细介绍:Spark Shuffle 详解与优化

                1、提高shuffle write 写磁盘时的buffer大小

                                spark.shuffle.file.buffer  默认32K

                2、增加或减少每次 reduce task 每次最大拉取的数据量

                                spark.reducer.maxSizeInFlight  默认48M

                3、增加 executor  中用于shuffle聚合的内存比例

                                spark.shuffle.memoryFraction  默认0.2

                4、增加数据拉取失败时 最大的重试次数

                                spark.shuffle.io.maxRetries  默认3次

                5、数据拉取失败时 重试拉取数据的等待间隔

                                spark.shuffle.io.retryWait  默认5s

                6、选用合适的 sort manager

                                小数据量使用HashShuffleManager,大数据量使用SortShuffleManager

                               spark.shuffle.manager  默认sort

                7、开启HashShuffleManager的 consolidate 合并机制

                                spark.shuffle.consolidateFiles   默认false

                8、调大 SortShuffleManager 的By Pass机制的阈值

                                当reduce task小于阈值则不进行排序。

                               spark.shuffle.sort.bypassMergeThreshold  默认200


调节堆外内存:

    同一个进程(程序)称为堆内,不属于该程序内存——堆外

    概述:

         Spark底层shuffle的传输方式是使用netty传输,netty在进行网络传输的过程会申请堆外内存netty是零拷贝,所以是使用内核空间内存),所以使用了堆外内存

        默认情况下,这个堆外内存上限默认 是每一个executor的内存大小的10%

        真正处理大数据的时候,这里都会出现问题,导致spark作业反复崩溃,无法运行,此时就会去调节这个参数,到至少1G(1024M),甚至说2G、4G。

    什么时候需要调节Executor的堆外内存大小?

        • shuffle file cannot find (DAGScheduler,resubmitting task)

        • executor lost

        • task lost

        • out of memory

    问题原因:

        一个Executor在进行shuffle write,优先从自己本地关联的BlockManager中获取某份数据,如果本地block manager没有的话,那么会通过BlockTransferService,会尝试建立远程的网络连接去远程连接其他节点上executor的blockmanager去获取。那么在这个过程出现错误,可能有如下原因:

            • Executor 由于 内存不足 或者 堆外内存不足 了,挂掉了,对应的Executor上面的 blockmanager也挂掉  了,找不到对应的shuffle map output文件,使得reducer端不能够拉取数据

            • Executor并没有挂掉,而是在拉取数据的过程出现了问题

                          Executor用于shuffle聚合的内存不够用  ,或者堆外内存不够用,使得BlockManager拉取数据失败

                          或者Map端的Executor由于内存不够用在 频繁GC,JVM让所以工作进程停止工作,导致 BlockManager连接超时

    解决办法

        上述情况下,就可以去考虑调节一下 调大用于shuffle聚合内存比例  或者 调大executor的堆外内存 ,也许就可以避免报错:

                    调大堆外内存比例方法:  (spark-submit脚本里面,去用--conf的方式,去添加配置,一定要注意)

                            • yarn下:

                                --conf spark.yarn.executor.memoryOverhead=2048 单位M

                            • standlone下:

                                --conf spark.executor.memoryOverhead=2048

                    BlockManager connection timeout ,则提高连接等待时间:

                                --conf spark.core.connection.ack.wait.timeout=300  默认60s

                    BlockManager fetch data fail ,则提高拉取数据的重试次数以及间隔时间    

                         

Spark调优的第六步是故障解决(troubleshooting):

    shuffle file cannot find : 磁盘小文件找不到了

    分析原因:

    解决办法:

        1、connection timeout --- shufle file cannot find

            提高建立连接的超时时间

            降低Executor GCtime    

                1、提高Executor内存大小

                2、降低存储内存比例或者降低聚合内存比例

        2、fetch data fail ---  shuffle file cannot find

            提高拉取数据的重试次数以及间隔时间

        3、OOM/execut lost   ---  shuffle file cannot find

            提高堆外内存大小

            提高堆内内存大小

            

    reduce OOM  

    分析原因:   

        BlockManager拉取的数据量大,reduce task处理的数据量小

    解决办法:

        1、降低每次拉取的数据量

        2、提高shuffle聚合的内存比例。。。。

        3、提高Executor的内存比例 水涨船高



相关:

添加新评论