你的位置:首页 > 软件开发 > Java > Spark Rdd coalesce()方法和repartition()方法

Spark Rdd coalesce()方法和repartition()方法

发布时间:2016-04-15 12:00:11
在Spark的Rdd中,Rdd是分区的。有时候需要重新设置Rdd的分区数量,比如Rdd的分区中,Rdd分区比较多,但是每个Rdd的数据量比较小,需要设置一个比较合理的分区。或者需要把Rdd的分区数量调大。还有就是通过设置一个Rdd的分区来达到设置生成的文件的数量。有两种方法是可以 ...

Spark Rdd coalesce()方法和repartition()方法

在Spark的Rdd中,Rdd是分区的。

有时候需要重新设置Rdd的分区数量,比如Rdd的分区中,Rdd分区比较多,但是每个Rdd的数据量比较小,需要设置一个比较合理的分区。或者需要把Rdd的分区数量调大。还有就是通过设置一个Rdd的分区来达到设置生成的文件的数量。

有两种方法是可以重设Rdd的分区:分别是 coalesce()方法和repartition()。

 这两个方法有什么区别,看看源码就知道了:

 def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)   : RDD[T] = withScope {  if (shuffle) {   /** Distributes elements evenly across output partitions, starting from a random partition. */   val distributePartition = (index: Int, items: Iterator[T]) => {    var position = (new Random(index)).nextInt(numPartitions)    items.map { t =>     // Note that the hash code of the key will just be the key itself. The HashPartitioner     // will mod it with the number of total partitions.     position = position + 1     (position, t)    }   } : Iterator[(Int, T)]   // include a shuffle step so that our upstream tasks are still distributed   new CoalescedRDD(    new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),    new HashPartitioner(numPartitions)),    numPartitions).values  } else {   new CoalescedRDD(this, numPartitions)  } }

原标题:Spark Rdd coalesce()方法和repartition()方法

关键词:

*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们: admin#shaoqun.com (#换成@)。

可能感兴趣文章

我的浏览记录