Spark笔记(四)-Spark之RDD-分区

By timebusker on June 25, 2018

基础知识

分区是RDD内部并行计算的一个计算单元,RDD的数据集在逻辑上被划分为多个分片,每一个分片称为分区,分区的格式决定了并行计算的粒度, 而每个分区的数值计算都是在一个任务中进行的,因此任务的个数,也是由RDD(准确来说是作业最后一个RDD)的分区数决定。

为什么要进行分区

数据分区,在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能。mapreduce框架的性能开支主要在io和网络传输,io因为要大量读写文件, 它是不可避免的,但是网络传输是可以避免的,把大文件压缩变小文件,从而减少网络传输,但是增加了cpu的计算负载。

Spark里面io也是不可避免的,但是网络传输spark里面进行了优化:

Spark把rdd进行分区(分片),放在集群上并行计算。同一个rdd分片100个,10个节点,平均一个节点10个分区,当进行sum型的计算的时候,先进行每个分区的sum,然后把sum值shuffle传输到主程序进行全局sum, 所以进行sum型计算对网络传输非常小。但对于进行join型的计算的时候,需要把数据本身进行shuffle,网络开销很大。spark是如何优化这个问题的呢?

Spark把key-value rdd通过key的hashcode进行分区,而且保证相同的key存储在同一个节点上,这样对改rdd进行key聚合时,就不需要shuffle过程,我们进行mapreduce计算的时候为什么要进行shuffle? 就是说mapreduce里面网络传输主要在shuffle阶段,shuffle的根本原因是相同的key存在不同的节点上,按key进行聚合的时候不得不进行shuffle。shuffle是非常影响网络的,它要把所有的数据混在一起走网络, 然后它才能把相同的key走到一起。要进行shuffle是存储决定的。

Spark从这个教训中得到启发,spark会把key进行分区,也就是key的hashcode进行分区,相同的key,hashcode肯定是一样的,所以它进行分区的时候100t的数据分成10分,每部分10个t, 它能确保相同的key肯定在一个分区里面,而且它能保证存储的时候相同的key能够存在同一个节点上。比如一个rdd分成了100份,集群有10个节点,所以每个节点存10份,每一分称为每个分区, spark能保证相同的key存在同一个节点上,实际上相同的key存在同一个分区。

key的分布不均决定了有的分区大有的分区小。没法分区保证完全相等,但它会保证在一个接近的范围。所以mapreduce里面做的某些工作里边,spark就不需要shuffle了,spark解决网络传输这块的根本原理就是这个。

进行join的时候是两个表,不可能把两个表都分区好,通常情况下是把用的频繁的大表事先进行分区,小表进行关联它的时候小表进行shuffle过程。

需要在工作节点间进行数据混洗的转换极大地受益于分区。这样的转换是cogroupgroupWithjoinleftOuterJoinrightOuterJoingroupByKeyreduceByKeycombineByKeylookup

Spark分区原则及方法

RDD分区的一个分区原则:尽可能是得分区的个数等于集群cpu core数目

无论是本地模式、Standalone模式、YARN模式或Mesos模式,我们都可以通过spark.default.parallelism来配置其默认分区个数,若没有设置该值,则根据不同的集群环境确定该值

partitionRDD的最小数据处理单元,可以看作是一个数据块,每个partition有个编号index。一个partition被一个map task处理。

spark.default.parallelism,默认的并发数(默认值2

当配置文件spark-default.conf中没有显示的配置,则按照如下规则取值:

本地模式

该模式下,不会启动executor,由SparkSubmit进程生成指定数量的线程数来并发。

spark-shell spark.default.parallelism = 1

spark-shell --master local[N] spark.default.parallelism = N (使用N个核)

spark-shell --master local spark.default.parallelism = 1
伪集群模式

x为本机上启动的executor数,y为每个executor使用的core数,z为每个 executor使用的内存

spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x*y
yarn/standalone模式
#  Others: total number of cores on all executor nodes or 2, whichever is larger
spark.default.parallelism =  max(所有executor使用的core总数, 2)

经过上面的规则,就能确定了spark.default.parallelism的默认值(前提是配置文件spark-default.conf中没有显示的配置,如果配置了,则spark.default.parallelism = 配置的值)

还有一个配置比较重要,spark.files.maxPartitionBytes = 128 M(默认) The maximum number of bytes to pack into a single partition when reading files. 代表着rdd的一个分区能存放数据的最大字节数,如果一个400m的文件,只分了两个区,则在action时会发生错误。

当一个spark应用程序执行时,生成sparkContext,同时会生成两个参数,由上面得到的spark.default.parallelism推导出这两个参数的值

sc.defaultParallelism     = spark.default.parallelism

sc.defaultMinPartitions = min(spark.default.parallelism,2)

当sc.defaultParallelism和sc.defaultMinPartitions最终确认后,就可以推算rdd的分区数了。

RDD分区的数据取决于哪些因素?

  • 如果是将Driver端的Scala集合并行化创建RDD,并且没有指定RDD的分区,RDD的分区就是为该app分配的核数
  • 如果是重hdfs中读取数据创建RDD,并且设置了最新分区数量是1,那么RDD的分区数据即使输入切片的数据, 如果不设置最小分区的数量,即spark调用textFile时会默认传入2,那么RDD的分区数量会打于等于输入切片的数量

  • Spark 2.3.X版本对分区有负载优化操作,对所有数据量求平均值,针对异常较大数据块进行二次切割(逻辑切割)计算

分区器(partitioner)

MR任务的map阶段的处理结果会进行分片(也可以叫分区,这个分区不同于上面的分区),分片的数量就是reduce task的数量。

spark中默认定义了两种partitioner:

  • 哈希分区器(Hash Partitioner) hash分区器会根据key-value的键值key的hashcode进行分区,速度快,但是可能产生数据偏移,造成每个分区中数据量不均衡。

  • 范围分区器(Range Partitioner) range分区器会对现有rdd中的key-value数据进行抽样,尽量找出均衡分割点,一定程度上解决了数据偏移问题,力求分区后的每个分区内数据量均衡,但是速度相对慢。

partitioner分区详情

在对父RDD执行完Map阶段任务后和在执行Reduce阶段任务前,会对Map阶段中间结果进行分区。分区由父RDD的partitioner确定,主要包括两部分工作:

  • 确定分区数量(也就是reduce task数量),也是子RDD的partition数量。
  • 决定将Map阶段中间结果的每个key-value对分到哪个分区上。

假设一个父RDD要执行reduceByKey任务,我们可以显式的指定分区器:

val rdd_child = rdd_parent.reduceByKey(new HashPartitioner(3), _+_)

HashPartitioner构造参数3就是分区数量,也是启动的reduce task数量,也是reduceByKey结果返回的子RDD的partitions方法返回的数组的长度。

如果没有显式指定分区器,则会调用org.apache.spark包下伴生对象Partitioner的defaultPartitioner静态方法返回的分区器作为默认分区器。

  • defaultPartitioner返回默认分区器的过程如下:

尝试利用父RDD的partitioner,如果父RDD没有partitioner,则会查看sparkConf中是否定义了spark.default.parallelism配置参数, 如果定义了就返回new HashPartitioner(sc.defaultParallelism)作为默认分区器, 如果没定义就返回new HashPartitioner(rdd_parent.partitions.length)作为默认分区器。

//org.apache.spark包下伴生对象object Partitioner的方法
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
  val rdds = (Seq(rdd) ++ others)
  val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
  if (hasPartitioner.nonEmpty) {
    hasPartitioner.maxBy(_.partitions.length).partitioner.get
  } else {
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(rdds.map(_.partitions.length).max)
    }
  }
}

无论是以本地模式、Standalone 模式、Yarn 模式或者是 Mesos 模式来运行 Apache Spark, 分区的默认个数等于对spark.default.parallelism的指定值,若该值未设置,则 Apache Spark 会根据不同集群模式的特征,来确定这个值。

自定义分区

继承Partitioner,需要实现numPartitions,getPartition2个方法。

class MyPartitoiner(val numParts:Int) extends  Partitioner{
  override def numPartitions: Int = numParts
  override def getPartition(key: Any): Int = {
    val domain = new URL(key.toString).getHost
    val code = (domain.hashCode % numParts)
    if (code < 0) {
      code + numParts
    } else {
      code
    }
  }
}

object DomainNamePartitioner {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("word count").setMaster("local")

    val sc = new SparkContext(conf)

    val urlRDD = sc.makeRDD(Seq(("http://baidu.com/test", 2),
      ("http://baidu.com/index", 2), ("http://ali.com", 3), ("http://baidu.com/tmmmm", 4),
      ("http://baidu.com/test", 4)))
    //Array[Array[(String, Int)]]
    // = Array(Array(),
    // Array((http://baidu.com/index,2), (http://baidu.com/tmmmm,4),
    // (http://baidu.com/test,4), (http://baidu.com/test,2), (http://ali.com,3)))
    val hashPartitionedRDD = urlRDD.partitionBy(new HashPartitioner(2))
    hashPartitionedRDD.glom().collect()

    //使用spark-shell --jar的方式将这个partitioner所在的jar包引进去,然后测试下面的代码
    // spark-shell --master spark://master:7077 --jars spark-rdd-1.0-SNAPSHOT.jar
    val partitionedRDD = urlRDD.partitionBy(new MyPartitoiner(2))
    val array = partitionedRDD.glom().collect()

  }
}